Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add grpc proxy backend #680

Closed
wants to merge 17 commits into from
Closed
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,19 @@ OPTIONS:
Goroutines to process parallel uploads to backend. (default: 100)
[$BAZEL_REMOTE_NUM_UPLOADERS]

--grpc_proxy.url value The base URL to use for a grpc proxy backend.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need an example URL here, so the form is obvious.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, added it

[$BAZEL_REMOTE_GRPC_PROXY_URL]

--grpc_proxy.key_file value Path to a key used to authenticate with the
proxy backend using mTLS. If this flag is provided, then
grpc_proxy.cert_file must also be specified.
[$BAZEL_REMOTE_GRPC_PROXY_KEY_FILE]

--grpc_proxy.cert_file value Path to a certificate used to authenticate
with the proxy backend using mTLS. If this flag is provided, then
grpc_proxy.key_file must also be specified.
[BAZEL_REMOTE_GRPC_PROXY_CERT_FILE]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How difficult would it be to add support for basic/password authentication? I think we should consider adding this, mostly to avoid documenting that it doesn't work, but also because it's probably easier to setup a system test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The password authentication is right now used only for clients to authenticate with bazel-remote, but not for it to authenticate with the proxy backend. As far as I can tell, none of the existing proxies support this right now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bazel-remote <-> proxy backend authentiation is completely separate from the client <-> bazel-remote authentication though. If bazel-remote is the main intended use case for the grpc proxy backend, then I think we should support that. But this would be OK to land in a followup (I can help if needed).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not realise that the http proxy actualy supports basic auth, just need to pass the credentials in the proxy url. I took a stab at doing the same for the grpc proxy.

--http_proxy.url value The base URL to use for a http proxy backend.
[$BAZEL_REMOTE_HTTP_PROXY_URL]

Expand Down
41 changes: 41 additions & 0 deletions cache/grpcproxy/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
srcs = [
"grpcproxy.go",
"readcloser.go",
],
importpath = "github.com/buchgr/bazel-remote/v2/cache/grpcproxy",
visibility = ["//visibility:public"],
deps = [
"//cache:go_default_library",
"//genproto/build/bazel/remote/asset/v1:go_default_library",
"//genproto/build/bazel/remote/execution/v2:go_default_library",
"//utils/backendproxy:go_default_library",
mostynb marked this conversation as resolved.
Show resolved Hide resolved
"@com_github_google_uuid//:go_default_library",
"@go_googleapis//google/bytestream:bytestream_go_proto",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//status:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = ["grpcproxy_test.go"],
embed = [":go_default_library"],
deps = [
"//cache:go_default_library",
"//cache/disk:go_default_library",
"//genproto/build/bazel/remote/execution/v2:go_default_library",
"//server:go_default_library",
"//utils:go_default_library",
"@com_github_google_uuid//:go_default_library",
"@go_googleapis//google/bytestream:bytestream_go_proto",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//credentials/insecure:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
],
)
316 changes: 316 additions & 0 deletions cache/grpcproxy/grpcproxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,316 @@
package grpcproxy

import (
"bytes"
"context"
"encoding/base64"
"encoding/hex"
"errors"
"fmt"
"io"

"github.com/buchgr/bazel-remote/v2/cache"
"github.com/buchgr/bazel-remote/v2/utils/backendproxy"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"

asset "github.com/buchgr/bazel-remote/v2/genproto/build/bazel/remote/asset/v1"
pb "github.com/buchgr/bazel-remote/v2/genproto/build/bazel/remote/execution/v2"
bs "google.golang.org/genproto/googleapis/bytestream"
)

type GrpcClients struct {
asset asset.FetchClient
bs bs.ByteStreamClient
ac pb.ActionCacheClient
cas pb.ContentAddressableStorageClient
}

func NewGrpcClients(cc *grpc.ClientConn) *GrpcClients {
mostynb marked this conversation as resolved.
Show resolved Hide resolved
return &GrpcClients{
asset: asset.NewFetchClient(cc),
bs: bs.NewByteStreamClient(cc),
ac: pb.NewActionCacheClient(cc),
cas: pb.NewContentAddressableStorageClient(cc),
}
}

type remoteGrpcProxyCache struct {
clients *GrpcClients
uploadQueue chan<- backendproxy.UploadReq
accessLogger cache.Logger
errorLogger cache.Logger
}

func New(clients *GrpcClients,
accessLogger cache.Logger, errorLogger cache.Logger,
numUploaders, maxQueuedUploads int) (cache.Proxy, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function only seems to return a nil error, do we need that return value?

On the other hand, maybe this should check the capabilities of the backend server, and possibly return an error from that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed both!


proxy := &remoteGrpcProxyCache{
clients: clients,
accessLogger: accessLogger,
errorLogger: errorLogger,
}

proxy.uploadQueue = backendproxy.StartUploaders(proxy, numUploaders, maxQueuedUploads)

return proxy, nil
}

// Helper function for logging responses
func logResponse(logger cache.Logger, method string, msg string, resource string) {
logger.Printf("GRPC PROXY %s %s: %s", method, msg, resource)
}

func (r *remoteGrpcProxyCache) UploadFile(item backendproxy.UploadReq) {
mostynb marked this conversation as resolved.
Show resolved Hide resolved
defer item.Rc.Close()

switch item.Kind {
case cache.RAW:
// RAW cache entries are a special case of AC, used when --disable_http_ac_validation
// is enabled. We can treat them as AC in this scope
fallthrough
mostynb marked this conversation as resolved.
Show resolved Hide resolved
case cache.AC:
data := make([]byte, item.SizeOnDisk)
read := int64(0)
for {
n, err := item.Rc.Read(data[read:])
if n > 0 {
read += int64(n)
}
if err == io.EOF || read == item.SizeOnDisk {
break
}
}
if read != item.SizeOnDisk {
logResponse(r.errorLogger, "AC Upload", "Unexpected short read", item.Hash)
return
}
ar := &pb.ActionResult{}
err := proto.Unmarshal(data, ar)
if err != nil {
logResponse(r.errorLogger, "AC Upload", err.Error(), item.Hash)
return
}
digest := &pb.Digest{
Hash: item.Hash,
SizeBytes: item.LogicalSize,
}

req := &pb.UpdateActionResultRequest{
ActionDigest: digest,
ActionResult: ar,
}
_, err = r.clients.ac.UpdateActionResult(context.Background(), req)
if err != nil {
logResponse(r.errorLogger, "AC Upload", err.Error(), item.Hash)
}
return
case cache.CAS:
stream, err := r.clients.bs.Write(context.Background())
if err != nil {
logResponse(r.errorLogger, "Write", err.Error(), item.Hash)
return
}

buf := make([]byte, 2*1024*1024)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 2M? The blob might be much smaller...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just copied it from grpc_bytestream.go but forgot to cap it at the blob size, thanks!

resourceName := fmt.Sprintf("uploads/%s/blobs/%s/%d", uuid.New().String(), item.Hash, item.SizeOnDisk)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this use item.LogicalSize instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should, thanks for catching that!

firstIteration := true
for {
n, err := item.Rc.Read(buf)
if err != nil && err != io.EOF {
logResponse(r.errorLogger, "Write", err.Error(), resourceName)
err := stream.CloseSend()
if err != nil {
logResponse(r.errorLogger, "Write", err.Error(), resourceName)
}
return
}
if n > 0 {
rn := ""
if firstIteration {
firstIteration = false
rn = resourceName
}
req := &bs.WriteRequest{
ResourceName: rn,
Data: buf[:n],
}
err := stream.Send(req)
if err != nil {
logResponse(r.errorLogger, "Write", err.Error(), resourceName)
return
}
} else {
_, err = stream.CloseAndRecv()
if err != nil {
logResponse(r.errorLogger, "Write", err.Error(), resourceName)
return
}
logResponse(r.accessLogger, "Write", "Success", resourceName)
return
}
}
default:
logResponse(r.errorLogger, "Write", "Unexpected kind", item.Kind.String())
return
}
}

func (r *remoteGrpcProxyCache) Put(ctx context.Context, kind cache.EntryKind, hash string, logicalSize int64, sizeOnDisk int64, rc io.ReadCloser) {
if r.uploadQueue == nil {
rc.Close()
return
}

item := backendproxy.UploadReq{
Hash: hash,
LogicalSize: logicalSize,
SizeOnDisk: sizeOnDisk,
Kind: kind,
Rc: rc,
}

select {
case r.uploadQueue <- item:
default:
r.errorLogger.Printf("too many uploads queued")
rc.Close()
}
}

func (r *remoteGrpcProxyCache) Get(ctx context.Context, kind cache.EntryKind, hash string) (io.ReadCloser, int64, error) {
switch kind {
case cache.RAW:
// RAW cache entries are a special case of AC, used when --disable_http_ac_validation
// is enabled. We can treat them as AC in this scope
fallthrough
case cache.AC:
digest := pb.Digest{
Hash: hash,
SizeBytes: -1,
}

req := &pb.GetActionResultRequest{ActionDigest: &digest}

res, err := r.clients.ac.GetActionResult(ctx, req)
status, ok := status.FromError(err)
if ok && status.Code() == codes.NotFound {
return nil, -1, nil
}

if err != nil {
logResponse(r.errorLogger, "GetActionResult", err.Error(), digest.Hash)
return nil, -1, err
}
data, err := proto.Marshal(res)
if err != nil {
logResponse(r.errorLogger, "GetActionResult", err.Error(), digest.Hash)
return nil, -1, err
}

logResponse(r.accessLogger, "GetActionResult", "Success", digest.Hash)
return io.NopCloser(bytes.NewReader(data)), int64(len(data)), nil

case cache.CAS:
// We don't know the size, so send a FetchBlob request first to get the digest
// TODO: consider passign the expected blob size to the proxy?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been one of the complicating factors that I have been putting off dealing with in order to add a grpc proxy backend...

The first proxy backends were added to bazel-remote before it had gRPC support, and were designed for bazel's simpler remote cache REST API. In particular, the REST api only specifies the hash but not the size of CAS blobs, whereas the gRPC API uses hash/size digests.

One way to handle this, would be to add an optional size argument to the Proxy functions, pass -1 if unknown, and let the proxy backend use the value if non-negative.

Another, probably more complicated option, would be to try refactoring a little, to add an alternate path for gRPC proxy backends from the gRPC code. While the setup might be more complicated, it could probably be more efficient, because there would be fewer calls (eg FindMissingBlobs, BatchUpdateBlobs and BatchReadBlobs wouldn't require multiple calls to the backend). I am not sure what this would look like- but I think it's worth analysing before accepting this feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using an alternate path for the backend risks making this very specific to bazel-remote and those servers that implement the same API. Instead, by using the bytestream and remote-exec apis, we make potentially any cache solution a possible grpc backend.
I took a stab at passing the size down to the proxy and the changes are simple enough and allows us to skip the additional requests entirely, provided the size is > 0.

decoded, err := hex.DecodeString(hash)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why decode the string, just to encode it again below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First we decode it into a byte array, then we encode it into a base64 string, just doing the opposite of what it's done in https://github.com/buchgr/bazel-remote/blob/master/server/grpc_asset.go#L74-L81

if err != nil {
return nil, -1, err
}
q := asset.Qualifier{
Name: "checksum.sri",
Value: fmt.Sprintf("sha256-%s", base64.StdEncoding.EncodeToString(decoded)),
}
freq := asset.FetchBlobRequest{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this a bit confusing, why is this using the remote asset API instead of BatchReadBlobs or the bytestream API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to know the size of the blob first. The alternative is passing it as parameter to this function.

Uris: []string{},
Qualifiers: []*asset.Qualifier{&q},
}

res, err := r.clients.asset.FetchBlob(ctx, &freq)
if err != nil {
logResponse(r.errorLogger, "FetchBlob", err.Error(), hash)
return nil, -1, err
}

if res.Status.GetCode() == int32(codes.NotFound) {
logResponse(r.accessLogger, "FetchBlob", res.Status.Message, hash)
return nil, -1, nil
}
if res.Status.GetCode() != int32(codes.OK) {
logResponse(r.errorLogger, "FetchBlob", res.Status.Message, hash)
return nil, -1, errors.New(res.Status.Message)
}

req := bs.ReadRequest{
ResourceName: fmt.Sprintf("blobs/%s/%d", res.BlobDigest.Hash, res.BlobDigest.SizeBytes),
}
stream, err := r.clients.bs.Read(ctx, &req)
if err != nil {
logResponse(r.errorLogger, "Read", err.Error(), hash)
return nil, -1, err
}
rc := StreamReadCloser[*bs.ReadResponse]{Stream: stream}
return &rc, res.BlobDigest.SizeBytes, nil
default:
return nil, -1, fmt.Errorf("Unexpected kind %s", kind)
}
}

func (r *remoteGrpcProxyCache) Contains(ctx context.Context, kind cache.EntryKind, hash string) (bool, int64) {
switch kind {
case cache.RAW:
// RAW cache entries are a special case of AC, used when --disable_http_ac_validation
// is enabled. We can treat them as AC in this scope
fallthrough
case cache.AC:
// There's not "contains" method for the action cache so the best we can do
// is to get the object and discard the result
// We don't expect this to ever be called anyways since it is not part of the grpc protocol
rc, size, err := r.Get(ctx, kind, hash)
rc.Close()
if err != nil || size < 0 {
return false, -1
}
return true, size
case cache.CAS:
decoded, err := hex.DecodeString(hash)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why decode the string, just to encode it again below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

if err != nil {
logResponse(r.errorLogger, "Contains", err.Error(), hash)
return false, -1
}
q := asset.Qualifier{
Name: "checksum.sri",
Value: fmt.Sprintf("sha256-%s", base64.StdEncoding.EncodeToString(decoded)),
}
freq := asset.FetchBlobRequest{
Uris: []string{},
Qualifiers: []*asset.Qualifier{&q},
}

res, err := r.clients.asset.FetchBlob(ctx, &freq)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use the remote asset API instead of FindMissingBlobs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

if err != nil {
logResponse(r.errorLogger, "Contains", err.Error(), hash)
return false, -1
}

if res.Status.GetCode() == int32(codes.NotFound) {
logResponse(r.accessLogger, "Contains", "Not Found", hash)
return false, -1
}
if res.Status.GetCode() != int32(codes.OK) {
logResponse(r.errorLogger, "Contains", res.Status.Message, hash)
return false, -1
}

logResponse(r.accessLogger, "Contains", "Success", hash)
return true, res.BlobDigest.SizeBytes
default:
logResponse(r.errorLogger, "Contains", "Unexpected kind", kind.String())
return false, -1
}
}
Loading