Skip to content

Commit

Permalink
Pass size to proxies
Browse files Browse the repository at this point in the history
  • Loading branch information
AlessandroPatti committed Aug 2, 2023
1 parent 728b2d6 commit 21fb101
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 27 deletions.
4 changes: 2 additions & 2 deletions cache/azblobproxy/azblobproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (c *azBlobCache) Put(ctx context.Context, kind cache.EntryKind, hash string
}
}

func (c *azBlobCache) Get(ctx context.Context, kind cache.EntryKind, hash string) (rc io.ReadCloser, size int64, err error) {
func (c *azBlobCache) Get(ctx context.Context, kind cache.EntryKind, hash string, _ int64) (rc io.ReadCloser, size int64, err error) {
key := c.objectKey(hash, kind)
if c.prefix != "" {
key = c.prefix + "/" + key
Expand Down Expand Up @@ -111,7 +111,7 @@ func (c *azBlobCache) Get(ctx context.Context, kind cache.EntryKind, hash string

var errNotFound = errors.New("NOT FOUND")

func (c *azBlobCache) Contains(ctx context.Context, kind cache.EntryKind, hash string) (bool, int64) {
func (c *azBlobCache) Contains(ctx context.Context, kind cache.EntryKind, hash string, _ int64) (bool, int64) {
key := c.objectKey(hash, kind)
if c.prefix != "" {
key = c.prefix + "/" + key
Expand Down
4 changes: 2 additions & 2 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ type Proxy interface {
// `hash` can be read, its logical size, and an error if something went
// wrong. The data available from `rc` is in the same format as used by
// the disk.Cache instance.
Get(ctx context.Context, kind EntryKind, hash string) (rc io.ReadCloser, size int64, err error)
Get(ctx context.Context, kind EntryKind, hash string, size int64) (io.ReadCloser, int64, error)

// Contains returns whether or not the cache item exists on the
// remote end, and the size if it exists (and -1 if the size is
// unknown).
Contains(ctx context.Context, kind EntryKind, hash string) (bool, int64)
Contains(ctx context.Context, kind EntryKind, hash string, size int64) (bool, int64)
}

// TransformActionCacheKey takes an ActionCache key and an instance name
Expand Down
4 changes: 2 additions & 2 deletions cache/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ func (c *diskCache) get(ctx context.Context, kind cache.EntryKind, hash string,
return nil, -1, nil
}

r, foundSize, err := c.proxy.Get(ctx, kind, hash)
r, foundSize, err := c.proxy.Get(ctx, kind, hash, size)
if r != nil {
defer r.Close()
}
Expand Down Expand Up @@ -731,7 +731,7 @@ func (c *diskCache) Contains(ctx context.Context, kind cache.EntryKind, hash str
}

if c.proxy != nil && size <= c.maxProxyBlobSize {
exists, foundSize = c.proxy.Contains(ctx, kind, hash)
exists, foundSize = c.proxy.Contains(ctx, kind, hash, size)
if exists && foundSize <= c.maxProxyBlobSize && !isSizeMismatch(size, foundSize) {
return true, foundSize
}
Expand Down
4 changes: 2 additions & 2 deletions cache/disk/disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (d proxyStub) Put(ctx context.Context, kind cache.EntryKind, hash string, l
// Not implemented.
}

func (d proxyStub) Get(ctx context.Context, kind cache.EntryKind, hash string) (io.ReadCloser, int64, error) {
func (d proxyStub) Get(ctx context.Context, kind cache.EntryKind, hash string, _ int64) (io.ReadCloser, int64, error) {
if hash != contentsHash || kind != cache.CAS {
return nil, -1, nil
}
Expand Down Expand Up @@ -269,7 +269,7 @@ func (d proxyStub) Get(ctx context.Context, kind cache.EntryKind, hash string) (
return readme, contentsLength, nil
}

func (d proxyStub) Contains(ctx context.Context, kind cache.EntryKind, hash string) (bool, int64) {
func (d proxyStub) Contains(ctx context.Context, kind cache.EntryKind, hash string, _ int64) (bool, int64) {
if hash != contentsHash || kind != cache.CAS {
return false, -1
}
Expand Down
2 changes: 1 addition & 1 deletion cache/disk/findmissing.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (c *diskCache) containsWorker() {
}
}

ok, _ = c.proxy.Contains(req.ctx, cache.CAS, (*req.digest).Hash)
ok, _ = c.proxy.Contains(req.ctx, cache.CAS, (*req.digest).Hash, (*req.digest).SizeBytes)
if ok {
c.accessLogger.Printf("GRPC CAS HEAD %s OK", (*req.digest).Hash)
// The blob exists on the proxy, remove it from the
Expand Down
8 changes: 4 additions & 4 deletions cache/disk/findmissing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ type testCWProxy struct {
func (p *testCWProxy) Put(ctx context.Context, kind cache.EntryKind, hash string, logicalSize int64, sizeOnDisk int64, rc io.ReadCloser) {
}

func (p *testCWProxy) Get(ctx context.Context, kind cache.EntryKind, hash string) (io.ReadCloser, int64, error) {
func (p *testCWProxy) Get(ctx context.Context, kind cache.EntryKind, hash string, _ int64) (io.ReadCloser, int64, error) {
return nil, -1, nil
}

func (p *testCWProxy) Contains(ctx context.Context, kind cache.EntryKind, hash string) (bool, int64) {
func (p *testCWProxy) Contains(ctx context.Context, kind cache.EntryKind, hash string, _ int64) (bool, int64) {
if kind == cache.CAS && hash == p.blob {
return true, 42
}
Expand Down Expand Up @@ -176,11 +176,11 @@ func (p *proxyAdapter) Put(ctx context.Context, kind cache.EntryKind, hash strin
}
}

func (p *proxyAdapter) Get(ctx context.Context, kind cache.EntryKind, hash string) (rc io.ReadCloser, size int64, err error) {
func (p *proxyAdapter) Get(ctx context.Context, kind cache.EntryKind, hash string, _ int64) (rc io.ReadCloser, size int64, err error) {
return p.cache.Get(ctx, kind, hash, size, 0)
}

func (p *proxyAdapter) Contains(ctx context.Context, kind cache.EntryKind, hash string) (bool, int64) {
func (p *proxyAdapter) Contains(ctx context.Context, kind cache.EntryKind, hash string, _ int64) (bool, int64) {
return p.cache.Contains(ctx, kind, hash, -1)
}

Expand Down
45 changes: 35 additions & 10 deletions cache/grpcproxy/grpcproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (r *remoteGrpcProxyCache) fetchBlobDigest(ctx context.Context, hash string)
return res.BlobDigest, nil
}

func (r *remoteGrpcProxyCache) Get(ctx context.Context, kind cache.EntryKind, hash string) (io.ReadCloser, int64, error) {
func (r *remoteGrpcProxyCache) Get(ctx context.Context, kind cache.EntryKind, hash string, size int64) (io.ReadCloser, int64, error) {
switch kind {
case cache.RAW:
// RAW cache entries are a special case of AC, used when --disable_http_ac_validation
Expand Down Expand Up @@ -274,11 +274,14 @@ func (r *remoteGrpcProxyCache) Get(ctx context.Context, kind cache.EntryKind, ha
return io.NopCloser(bytes.NewReader(data)), int64(len(data)), nil

case cache.CAS:
digest, err := r.fetchBlobDigest(ctx, hash)
if err != nil {
logResponse(r.errorLogger, "FetchBlob", err.Error(), hash)
if size < 0 {
// We don't know the size, so send a FetchBlob request first to get the digest
digest, err := r.fetchBlobDigest(ctx, hash)
if err != nil {
logResponse(r.errorLogger, "FetchBlob", err.Error(), hash)
}
size = digest.SizeBytes
}
size := digest.SizeBytes

req := bs.ReadRequest{
ResourceName: fmt.Sprintf("blobs/%s/%d", hash, size),
Expand All @@ -295,7 +298,7 @@ func (r *remoteGrpcProxyCache) Get(ctx context.Context, kind cache.EntryKind, ha
}
}

func (r *remoteGrpcProxyCache) Contains(ctx context.Context, kind cache.EntryKind, hash string) (bool, int64) {
func (r *remoteGrpcProxyCache) Contains(ctx context.Context, kind cache.EntryKind, hash string, size int64) (bool, int64) {
switch kind {
case cache.RAW:
// RAW cache entries are a special case of AC, used when --disable_http_ac_validation
Expand All @@ -305,20 +308,42 @@ func (r *remoteGrpcProxyCache) Contains(ctx context.Context, kind cache.EntryKin
// There's not "contains" method for the action cache so the best we can do
// is to get the object and discard the result
// We don't expect this to ever be called anyways since it is not part of the grpc protocol
rc, size, err := r.Get(ctx, kind, hash)
rc, size, err := r.Get(ctx, kind, hash, size)
rc.Close()
if err != nil || size < 0 {
return false, -1
}
return true, size
case cache.CAS:
digest, err := r.fetchBlobDigest(ctx, hash)
if size < 0 {
// If don't know the size, use the remote asset api to find the missing blob
digest, err := r.fetchBlobDigest(ctx, hash)
if err != nil {
logResponse(r.errorLogger, "Contains", err.Error(), hash)
return false, -1
}
logResponse(r.accessLogger, "Contains", "Success", hash)
return true, digest.SizeBytes
}

// If we know the size, prefer using the remote execution api
req := &pb.FindMissingBlobsRequest{
BlobDigests: []*pb.Digest{{
Hash: hash,
SizeBytes: size,
}},
}
res, err := r.clients.cas.FindMissingBlobs(ctx, req)
if err != nil {
logResponse(r.errorLogger, "Contains", err.Error(), hash)
return false, -1
}
logResponse(r.accessLogger, "Contains", "Success", hash)
return true, digest.SizeBytes
for range res.MissingBlobDigests {
logResponse(r.accessLogger, "Contains", "Not Found", hash)
return false, -1
}
logResponse(r.errorLogger, "Contains", "Success", hash)
return true, size
default:
logResponse(r.errorLogger, "Contains", "Unexpected kind", kind.String())
return false, -1
Expand Down
4 changes: 2 additions & 2 deletions cache/httpproxy/httpproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (r *remoteHTTPProxyCache) Put(ctx context.Context, kind cache.EntryKind, ha
}
}

func (r *remoteHTTPProxyCache) Get(ctx context.Context, kind cache.EntryKind, hash string) (io.ReadCloser, int64, error) {
func (r *remoteHTTPProxyCache) Get(ctx context.Context, kind cache.EntryKind, hash string, _ int64) (io.ReadCloser, int64, error) {
url := r.requestURL(hash, kind)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
Expand Down Expand Up @@ -220,7 +220,7 @@ func (r *remoteHTTPProxyCache) Get(ctx context.Context, kind cache.EntryKind, ha
return rsp.Body, sizeBytes, nil
}

func (r *remoteHTTPProxyCache) Contains(ctx context.Context, kind cache.EntryKind, hash string) (bool, int64) {
func (r *remoteHTTPProxyCache) Contains(ctx context.Context, kind cache.EntryKind, hash string, _ int64) (bool, int64) {

url := r.requestURL(hash, kind)

Expand Down
4 changes: 2 additions & 2 deletions cache/s3proxy/s3proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (c *s3Cache) UpdateModificationTimestamp(ctx context.Context, bucket string
logResponse(c.accessLogger, "COMPOSE", bucket, object, err)
}

func (c *s3Cache) Get(ctx context.Context, kind cache.EntryKind, hash string) (io.ReadCloser, int64, error) {
func (c *s3Cache) Get(ctx context.Context, kind cache.EntryKind, hash string, _ int64) (io.ReadCloser, int64, error) {

rc, info, _, err := c.mcore.GetObject(
ctx,
Expand Down Expand Up @@ -236,7 +236,7 @@ func (c *s3Cache) Get(ctx context.Context, kind cache.EntryKind, hash string) (i
return rc, info.Size, nil
}

func (c *s3Cache) Contains(ctx context.Context, kind cache.EntryKind, hash string) (bool, int64) {
func (c *s3Cache) Contains(ctx context.Context, kind cache.EntryKind, hash string, _ int64) (bool, int64) {
size := int64(-1)
exists := false

Expand Down

0 comments on commit 21fb101

Please sign in to comment.