Skip to content

Commit

Permalink
[CONNECTOPS-97] Pass in a bespoke HTTP transport to GCS client
Browse files Browse the repository at this point in the history
This will let us experiment with things around the state of the HTTP/2
connection that storage uses, and hopefully track down the cause of the
REFUSED_STREAM errors we are seeing
  • Loading branch information
KJTsanaktsidis committed Nov 6, 2018
1 parent 70213c6 commit 16b2b83
Showing 1 changed file with 93 additions and 4 deletions.
97 changes: 93 additions & 4 deletions physical/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package gcs

import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"os"
"sort"
"strconv"
Expand All @@ -19,6 +23,9 @@ import (

"cloud.google.com/go/storage"
"github.com/armon/go-metrics"
"golang.org/x/net/http2"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
)
Expand Down Expand Up @@ -79,6 +86,8 @@ type Backend struct {
// logger and permitPool are internal constructs
logger log.Logger
permitPool *physical.PermitPool

keyLogger io.WriteCloser
}

// NewBackend constructs a Google Cloud Storage backend with the given
Expand Down Expand Up @@ -141,28 +150,85 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error
)
logger.Debug("creating client")

ctx := context.Background()
dialer := net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: false,
}
tlsConfig := &tls.Config{}

var keyLogger io.WriteCloser
didReturnKeyLogger := false
defer func() {
if keyLogger != nil && !didReturnKeyLogger {
keyLogger.Close()
}
}()
if keylogFile, ok := os.LookupEnv("SSLKEYLOGFILE"); ok {
keyLogger, err = os.OpenFile(keylogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
if err != nil {
return nil, errwrap.Wrapf("failed to open SSLKEYLOGFILE: {{err}}", err)
}
tlsConfig.KeyLogWriter = keyLogger
}

httpTransport := &http.Transport{
TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment,
DialContext: dialer.DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
if err := http2.ConfigureTransport(httpTransport); err != nil {
return nil, errwrap.Wrapf("failed to configure http2: {{err}}", err)
}
uaTransportWrapper := &userAgentTransport{
base: httpTransport,
userAgent: useragent.String(),
}

tokenSource, err := google.DefaultTokenSource(ctx)

// Client
opts := []option.ClientOption{option.WithUserAgent(useragent.String())}
if credentialsFile := c["credentials_file"]; credentialsFile != "" {
logger.Warn("specifying credentials_file as an option is " +
"deprecated. Please use the GOOGLE_APPLICATION_CREDENTIALS environment " +
"variable or instance credentials instead.")
opts = append(opts, option.WithServiceAccountFile(credentialsFile))
jsonKeyBytes, err := ioutil.ReadFile(credentialsFile)
if err != nil {
return nil, errwrap.Wrapf("failed to read credentials file: {{err}}", err)
}
tokenSource, err = google.JWTAccessTokenSourceFromJSON(jsonKeyBytes, storage.ScopeFullControl)
if err != nil {
return nil, errwrap.Wrapf("failed to construct credentials token source: {{err}}", err)
}
}

ctx := context.Background()
client, err := storage.NewClient(ctx, opts...)
oauth2Transport := &oauth2.Transport{
Source: tokenSource,
Base: uaTransportWrapper,
}
httpClient := &http.Client{
Transport: oauth2Transport,
}

client, err := storage.NewClient(ctx, option.WithHTTPClient(httpClient))
if err != nil {
return nil, errwrap.Wrapf("failed to create storage client: {{err}}", err)
}

didReturnKeyLogger = true
return &Backend{
bucket: bucket,
haEnabled: haEnabled,

client: client,
permitPool: physical.NewPermitPool(maxParallel),
logger: logger,
keyLogger: keyLogger,
}, nil
}

Expand Down Expand Up @@ -294,3 +360,26 @@ func extractInt(s string) (int, error) {
}
return strconv.Atoi(s)
}

type userAgentTransport struct {
userAgent string
base http.RoundTripper
}

func (t userAgentTransport) RoundTrip(req *http.Request) (*http.Response, error) {
rt := t.base
if rt == nil {
return nil, errors.New("transport: no Transport specified")
}
if t.userAgent == "" {
return rt.RoundTrip(req)
}
newReq := *req
newReq.Header = make(http.Header)
for k, vv := range req.Header {
newReq.Header[k] = vv
}
// TODO(cbro): append to existing User-Agent header?
newReq.Header["User-Agent"] = []string{t.userAgent}
return rt.RoundTrip(&newReq)
}

0 comments on commit 16b2b83

Please sign in to comment.