diff --git a/src/go.mod b/src/go.mod index 69e2655e88e7d..3464adfeee4ad 100644 --- a/src/go.mod +++ b/src/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 - golang.org/x/net v0.0.0-20210825183410-e898025ed96a + golang.org/x/net v0.0.0-20211004220534-69340ce214a7 ) require ( diff --git a/src/go.sum b/src/go.sum index 1c419b90baf8a..18cdc2854a392 100644 --- a/src/go.sum +++ b/src/go.sum @@ -2,6 +2,12 @@ golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 h1:HWj/xjIHfjYU5nVXpTM0s3 golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/net v0.0.0-20210825183410-e898025ed96a h1:bRuuGXV8wwSdGTB+CtJf+FjgO1APK1CoO39T4BN/XBw= golang.org/x/net v0.0.0-20210825183410-e898025ed96a/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211004164453-cedda3a722dd h1:Q6PfiuMddtCdycHT4hrZ7ZhVpAdQlA7qJp+ZhUw7Rdo= +golang.org/x/net v0.0.0-20211004164453-cedda3a722dd/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211004195052-b30845b58a23 h1:j34uvNZ757YpJXjsTk19wPCR/3tAhHPT4EMFysLc9Xg= +golang.org/x/net v0.0.0-20211004195052-b30845b58a23/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211004220534-69340ce214a7 h1:mAWBL9V7JYRSixWOKNpbAMF16bdRk4x94pU+I+WPGW4= +golang.org/x/net v0.0.0-20211004220534-69340ce214a7/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e h1:XMgFehsDnnLGtjvjOfqWSUzt0alpTR1RSEuznObga2c= golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= diff --git a/src/net/http/h2_bundle.go b/src/net/http/h2_bundle.go index 14f4d0e5fcc4e..29226d4065e36 100644 --- a/src/net/http/h2_bundle.go +++ b/src/net/http/h2_bundle.go @@ -737,6 +737,12 @@ func http2isBadCipher(cipher uint16) bool { // ClientConnPool manages a pool of HTTP/2 client connections. type http2ClientConnPool interface { + // GetClientConn returns a specific HTTP/2 connection (usually + // a TLS-TCP connection) to an HTTP/2 server. On success, the + // returned ClientConn accounts for the upcoming RoundTrip + // call, so the caller should not omit it. If the caller needs + // to, ClientConn.RoundTrip can be called with a bogus + // new(http.Request) to release the stream reservation. GetClientConn(req *Request, addr string) (*http2ClientConn, error) MarkDead(*http2ClientConn) } @@ -763,7 +769,7 @@ type http2clientConnPool struct { conns map[string][]*http2ClientConn // key is host:port dialing map[string]*http2dialCall // currently in-flight dials keys map[*http2ClientConn][]string - addConnCalls map[string]*http2addConnCall // in-flight addConnIfNeede calls + addConnCalls map[string]*http2addConnCall // in-flight addConnIfNeeded calls } func (p *http2clientConnPool) GetClientConn(req *Request, addr string) (*http2ClientConn, error) { @@ -775,28 +781,8 @@ const ( http2noDialOnMiss = false ) -// shouldTraceGetConn reports whether getClientConn should call any -// ClientTrace.GetConn hook associated with the http.Request. -// -// This complexity is needed to avoid double calls of the GetConn hook -// during the back-and-forth between net/http and x/net/http2 (when the -// net/http.Transport is upgraded to also speak http2), as well as support -// the case where x/net/http2 is being used directly. -func (p *http2clientConnPool) shouldTraceGetConn(st http2clientConnIdleState) bool { - // If our Transport wasn't made via ConfigureTransport, always - // trace the GetConn hook if provided, because that means the - // http2 package is being used directly and it's the one - // dialing, as opposed to net/http. - if _, ok := p.t.ConnPool.(http2noDialClientConnPool); !ok { - return true - } - // Otherwise, only use the GetConn hook if this connection has - // been used previously for other requests. For fresh - // connections, the net/http package does the dialing. - return !st.freshConn -} - func (p *http2clientConnPool) getClientConn(req *Request, addr string, dialOnMiss bool) (*http2ClientConn, error) { + // TODO(dneil): Dial a new connection when t.DisableKeepAlives is set? if http2isConnectionCloseRequest(req) && dialOnMiss { // It gets its own connection. http2traceGetConn(req, addr) @@ -810,10 +796,14 @@ func (p *http2clientConnPool) getClientConn(req *Request, addr string, dialOnMis for { p.mu.Lock() for _, cc := range p.conns[addr] { - if st := cc.idleState(); st.canTakeNewRequest { - if p.shouldTraceGetConn(st) { + if cc.ReserveNewRequest() { + // When a connection is presented to us by the net/http package, + // the GetConn hook has already been called. + // Don't call it a second time here. + if !cc.getConnCalled { http2traceGetConn(req, addr) } + cc.getConnCalled = false p.mu.Unlock() return cc, nil } @@ -829,7 +819,13 @@ func (p *http2clientConnPool) getClientConn(req *Request, addr string, dialOnMis if http2shouldRetryDial(call, req) { continue } - return call.res, call.err + cc, err := call.res, call.err + if err != nil { + return nil, err + } + if cc.ReserveNewRequest() { + return cc, nil + } } } @@ -926,6 +922,7 @@ func (c *http2addConnCall) run(t *http2Transport, key string, tc *tls.Conn) { if err != nil { c.err = err } else { + cc.getConnCalled = true // already called by the net/http package p.addConnLocked(key, cc) } delete(p.addConnCalls, key) @@ -1212,6 +1209,13 @@ func (e http2ErrCode) String() string { return fmt.Sprintf("unknown error code 0x%x", uint32(e)) } +func (e http2ErrCode) stringToken() string { + if s, ok := http2errCodeName[e]; ok { + return s + } + return fmt.Sprintf("ERR_UNKNOWN_%d", uint32(e)) +} + // ConnectionError is an error that results in the termination of the // entire connection. type http2ConnectionError http2ErrCode @@ -1228,6 +1232,11 @@ type http2StreamError struct { Cause error // optional additional detail } +// errFromPeer is a sentinel error value for StreamError.Cause to +// indicate that the StreamError was sent from the peer over the wire +// and wasn't locally generated in the Transport. +var http2errFromPeer = errors.New("received from peer") + func http2streamError(id uint32, code http2ErrCode) http2StreamError { return http2StreamError{StreamID: id, Code: code} } @@ -1442,7 +1451,7 @@ var http2flagName = map[http2FrameType]map[http2Flags]string{ // a frameParser parses a frame given its FrameHeader and payload // bytes. The length of payload will always equal fh.Length (which // might be 0). -type http2frameParser func(fc *http2frameCache, fh http2FrameHeader, payload []byte) (http2Frame, error) +type http2frameParser func(fc *http2frameCache, fh http2FrameHeader, countError func(string), payload []byte) (http2Frame, error) var http2frameParsers = map[http2FrameType]http2frameParser{ http2FrameData: http2parseDataFrame, @@ -1587,6 +1596,11 @@ type http2Framer struct { lastFrame http2Frame errDetail error + // countError is a non-nil func that's called on a frame parse + // error with some unique error path token. It's initialized + // from Transport.CountError or Server.CountError. + countError func(errToken string) + // lastHeaderStream is non-zero if the last frame was an // unfinished HEADERS/CONTINUATION. lastHeaderStream uint32 @@ -1749,6 +1763,7 @@ func http2NewFramer(w io.Writer, r io.Reader) *http2Framer { fr := &http2Framer{ w: w, r: r, + countError: func(string) {}, logReads: http2logFrameReads, logWrites: http2logFrameWrites, debugReadLoggerf: log.Printf, @@ -1823,7 +1838,7 @@ func (fr *http2Framer) ReadFrame() (http2Frame, error) { if _, err := io.ReadFull(fr.r, payload); err != nil { return nil, err } - f, err := http2typeFrameParser(fh.Type)(fr.frameCache, fh, payload) + f, err := http2typeFrameParser(fh.Type)(fr.frameCache, fh, fr.countError, payload) if err != nil { if ce, ok := err.(http2connError); ok { return nil, fr.connError(ce.Code, ce.Reason) @@ -1911,13 +1926,14 @@ func (f *http2DataFrame) Data() []byte { return f.data } -func http2parseDataFrame(fc *http2frameCache, fh http2FrameHeader, payload []byte) (http2Frame, error) { +func http2parseDataFrame(fc *http2frameCache, fh http2FrameHeader, countError func(string), payload []byte) (http2Frame, error) { if fh.StreamID == 0 { // DATA frames MUST be associated with a stream. If a // DATA frame is received whose stream identifier // field is 0x0, the recipient MUST respond with a // connection error (Section 5.4.1) of type // PROTOCOL_ERROR. + countError("frame_data_stream_0") return nil, http2connError{http2ErrCodeProtocol, "DATA frame with stream ID 0"} } f := fc.getDataFrame() @@ -1928,6 +1944,7 @@ func http2parseDataFrame(fc *http2frameCache, fh http2FrameHeader, payload []byt var err error payload, padSize, err = http2readByte(payload) if err != nil { + countError("frame_data_pad_byte_short") return nil, err } } @@ -1936,6 +1953,7 @@ func http2parseDataFrame(fc *http2frameCache, fh http2FrameHeader, payload []byt // length of the frame payload, the recipient MUST // treat this as a connection error. // Filed: https://github.com/http2/http2-spec/issues/610 + countError("frame_data_pad_too_big") return nil, http2connError{http2ErrCodeProtocol, "pad size larger than data payload"} } f.data = payload[:len(payload)-int(padSize)] @@ -2018,7 +2036,7 @@ type http2SettingsFrame struct { p []byte } -func http2parseSettingsFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (http2Frame, error) { +func http2parseSettingsFrame(_ *http2frameCache, fh http2FrameHeader, countError func(string), p []byte) (http2Frame, error) { if fh.Flags.Has(http2FlagSettingsAck) && fh.Length > 0 { // When this (ACK 0x1) bit is set, the payload of the // SETTINGS frame MUST be empty. Receipt of a @@ -2026,6 +2044,7 @@ func http2parseSettingsFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) // field value other than 0 MUST be treated as a // connection error (Section 5.4.1) of type // FRAME_SIZE_ERROR. + countError("frame_settings_ack_with_length") return nil, http2ConnectionError(http2ErrCodeFrameSize) } if fh.StreamID != 0 { @@ -2036,14 +2055,17 @@ func http2parseSettingsFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) // field is anything other than 0x0, the endpoint MUST // respond with a connection error (Section 5.4.1) of // type PROTOCOL_ERROR. + countError("frame_settings_has_stream") return nil, http2ConnectionError(http2ErrCodeProtocol) } if len(p)%6 != 0 { + countError("frame_settings_mod_6") // Expecting even number of 6 byte settings. return nil, http2ConnectionError(http2ErrCodeFrameSize) } f := &http2SettingsFrame{http2FrameHeader: fh, p: p} if v, ok := f.Value(http2SettingInitialWindowSize); ok && v > (1<<31)-1 { + countError("frame_settings_window_size_too_big") // Values above the maximum flow control window size of 2^31 - 1 MUST // be treated as a connection error (Section 5.4.1) of type // FLOW_CONTROL_ERROR. @@ -2155,11 +2177,13 @@ type http2PingFrame struct { func (f *http2PingFrame) IsAck() bool { return f.Flags.Has(http2FlagPingAck) } -func http2parsePingFrame(_ *http2frameCache, fh http2FrameHeader, payload []byte) (http2Frame, error) { +func http2parsePingFrame(_ *http2frameCache, fh http2FrameHeader, countError func(string), payload []byte) (http2Frame, error) { if len(payload) != 8 { + countError("frame_ping_length") return nil, http2ConnectionError(http2ErrCodeFrameSize) } if fh.StreamID != 0 { + countError("frame_ping_has_stream") return nil, http2ConnectionError(http2ErrCodeProtocol) } f := &http2PingFrame{http2FrameHeader: fh} @@ -2195,11 +2219,13 @@ func (f *http2GoAwayFrame) DebugData() []byte { return f.debugData } -func http2parseGoAwayFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (http2Frame, error) { +func http2parseGoAwayFrame(_ *http2frameCache, fh http2FrameHeader, countError func(string), p []byte) (http2Frame, error) { if fh.StreamID != 0 { + countError("frame_goaway_has_stream") return nil, http2ConnectionError(http2ErrCodeProtocol) } if len(p) < 8 { + countError("frame_goaway_short") return nil, http2ConnectionError(http2ErrCodeFrameSize) } return &http2GoAwayFrame{ @@ -2235,7 +2261,7 @@ func (f *http2UnknownFrame) Payload() []byte { return f.p } -func http2parseUnknownFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (http2Frame, error) { +func http2parseUnknownFrame(_ *http2frameCache, fh http2FrameHeader, countError func(string), p []byte) (http2Frame, error) { return &http2UnknownFrame{fh, p}, nil } @@ -2246,8 +2272,9 @@ type http2WindowUpdateFrame struct { Increment uint32 // never read with high bit set } -func http2parseWindowUpdateFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (http2Frame, error) { +func http2parseWindowUpdateFrame(_ *http2frameCache, fh http2FrameHeader, countError func(string), p []byte) (http2Frame, error) { if len(p) != 4 { + countError("frame_windowupdate_bad_len") return nil, http2ConnectionError(http2ErrCodeFrameSize) } inc := binary.BigEndian.Uint32(p[:4]) & 0x7fffffff // mask off high reserved bit @@ -2259,8 +2286,10 @@ func http2parseWindowUpdateFrame(_ *http2frameCache, fh http2FrameHeader, p []by // control window MUST be treated as a connection // error (Section 5.4.1). if fh.StreamID == 0 { + countError("frame_windowupdate_zero_inc_conn") return nil, http2ConnectionError(http2ErrCodeProtocol) } + countError("frame_windowupdate_zero_inc_stream") return nil, http2streamError(fh.StreamID, http2ErrCodeProtocol) } return &http2WindowUpdateFrame{ @@ -2311,7 +2340,7 @@ func (f *http2HeadersFrame) HasPriority() bool { return f.http2FrameHeader.Flags.Has(http2FlagHeadersPriority) } -func http2parseHeadersFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (_ http2Frame, err error) { +func http2parseHeadersFrame(_ *http2frameCache, fh http2FrameHeader, countError func(string), p []byte) (_ http2Frame, err error) { hf := &http2HeadersFrame{ http2FrameHeader: fh, } @@ -2320,11 +2349,13 @@ func http2parseHeadersFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) ( // is received whose stream identifier field is 0x0, the recipient MUST // respond with a connection error (Section 5.4.1) of type // PROTOCOL_ERROR. + countError("frame_headers_zero_stream") return nil, http2connError{http2ErrCodeProtocol, "HEADERS frame with stream ID 0"} } var padLength uint8 if fh.Flags.Has(http2FlagHeadersPadded) { if p, padLength, err = http2readByte(p); err != nil { + countError("frame_headers_pad_short") return } } @@ -2332,16 +2363,19 @@ func http2parseHeadersFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) ( var v uint32 p, v, err = http2readUint32(p) if err != nil { + countError("frame_headers_prio_short") return nil, err } hf.Priority.StreamDep = v & 0x7fffffff hf.Priority.Exclusive = (v != hf.Priority.StreamDep) // high bit was set p, hf.Priority.Weight, err = http2readByte(p) if err != nil { + countError("frame_headers_prio_weight_short") return nil, err } } - if len(p)-int(padLength) <= 0 { + if len(p)-int(padLength) < 0 { + countError("frame_headers_pad_too_big") return nil, http2streamError(fh.StreamID, http2ErrCodeProtocol) } hf.headerFragBuf = p[:len(p)-int(padLength)] @@ -2448,11 +2482,13 @@ func (p http2PriorityParam) IsZero() bool { return p == http2PriorityParam{} } -func http2parsePriorityFrame(_ *http2frameCache, fh http2FrameHeader, payload []byte) (http2Frame, error) { +func http2parsePriorityFrame(_ *http2frameCache, fh http2FrameHeader, countError func(string), payload []byte) (http2Frame, error) { if fh.StreamID == 0 { + countError("frame_priority_zero_stream") return nil, http2connError{http2ErrCodeProtocol, "PRIORITY frame with stream ID 0"} } if len(payload) != 5 { + countError("frame_priority_bad_length") return nil, http2connError{http2ErrCodeFrameSize, fmt.Sprintf("PRIORITY frame payload size was %d; want 5", len(payload))} } v := binary.BigEndian.Uint32(payload[:4]) @@ -2495,11 +2531,13 @@ type http2RSTStreamFrame struct { ErrCode http2ErrCode } -func http2parseRSTStreamFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (http2Frame, error) { +func http2parseRSTStreamFrame(_ *http2frameCache, fh http2FrameHeader, countError func(string), p []byte) (http2Frame, error) { if len(p) != 4 { + countError("frame_rststream_bad_len") return nil, http2ConnectionError(http2ErrCodeFrameSize) } if fh.StreamID == 0 { + countError("frame_rststream_zero_stream") return nil, http2ConnectionError(http2ErrCodeProtocol) } return &http2RSTStreamFrame{fh, http2ErrCode(binary.BigEndian.Uint32(p[:4]))}, nil @@ -2525,8 +2563,9 @@ type http2ContinuationFrame struct { headerFragBuf []byte } -func http2parseContinuationFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (http2Frame, error) { +func http2parseContinuationFrame(_ *http2frameCache, fh http2FrameHeader, countError func(string), p []byte) (http2Frame, error) { if fh.StreamID == 0 { + countError("frame_continuation_zero_stream") return nil, http2connError{http2ErrCodeProtocol, "CONTINUATION frame with stream ID 0"} } return &http2ContinuationFrame{fh, p}, nil @@ -2575,7 +2614,7 @@ func (f *http2PushPromiseFrame) HeadersEnded() bool { return f.http2FrameHeader.Flags.Has(http2FlagPushPromiseEndHeaders) } -func http2parsePushPromise(_ *http2frameCache, fh http2FrameHeader, p []byte) (_ http2Frame, err error) { +func http2parsePushPromise(_ *http2frameCache, fh http2FrameHeader, countError func(string), p []byte) (_ http2Frame, err error) { pp := &http2PushPromiseFrame{ http2FrameHeader: fh, } @@ -2586,6 +2625,7 @@ func http2parsePushPromise(_ *http2frameCache, fh http2FrameHeader, p []byte) (_ // with. If the stream identifier field specifies the value // 0x0, a recipient MUST respond with a connection error // (Section 5.4.1) of type PROTOCOL_ERROR. + countError("frame_pushpromise_zero_stream") return nil, http2ConnectionError(http2ErrCodeProtocol) } // The PUSH_PROMISE frame includes optional padding. @@ -2593,18 +2633,21 @@ func http2parsePushPromise(_ *http2frameCache, fh http2FrameHeader, p []byte) (_ var padLength uint8 if fh.Flags.Has(http2FlagPushPromisePadded) { if p, padLength, err = http2readByte(p); err != nil { + countError("frame_pushpromise_pad_short") return } } p, pp.PromiseID, err = http2readUint32(p) if err != nil { + countError("frame_pushpromise_promiseid_short") return } pp.PromiseID = pp.PromiseID & (1<<31 - 1) if int(padLength) > len(p) { // like the DATA frame, error out if padding is longer than the body. + countError("frame_pushpromise_pad_too_big") return nil, http2ConnectionError(http2ErrCodeProtocol) } pp.headerFragBuf = p[:len(p)-int(padLength)] @@ -3574,6 +3617,17 @@ type http2pipeBuffer interface { io.Reader } +// setBuffer initializes the pipe buffer. +// It has no effect if the pipe is already closed. +func (p *http2pipe) setBuffer(b http2pipeBuffer) { + p.mu.Lock() + defer p.mu.Unlock() + if p.err != nil || p.breakErr != nil { + return + } + p.b = b +} + func (p *http2pipe) Len() int { p.mu.Lock() defer p.mu.Unlock() @@ -3790,6 +3844,12 @@ type http2Server struct { // If nil, a default scheduler is chosen. NewWriteScheduler func() http2WriteScheduler + // CountError, if non-nil, is called on HTTP/2 server errors. + // It's intended to increment a metric for monitoring, such + // as an expvar or Prometheus metric. + // The errType consists of only ASCII word characters. + CountError func(errType string) + // Internal state. This is a pointer (rather than embedded directly) // so that we don't embed a Mutex in this struct, which will make the // struct non-copyable, which might break some callers. @@ -4065,6 +4125,9 @@ func (s *http2Server) ServeConn(c net.Conn, opts *http2ServeConnOpts) { sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf) fr := http2NewFramer(sc.bw, c) + if s.CountError != nil { + fr.countError = s.CountError + } fr.ReadMetaHeaders = hpack.NewDecoder(http2initialHeaderTableSize, nil) fr.MaxHeaderListSize = sc.maxHeaderListSize() fr.SetMaxReadFrameSize(s.maxReadFrameSize()) @@ -5064,7 +5127,7 @@ func (sc *http2serverConn) processFrame(f http2Frame) error { // First frame received must be SETTINGS. if !sc.sawFirstSettings { if _, ok := f.(*http2SettingsFrame); !ok { - return http2ConnectionError(http2ErrCodeProtocol) + return sc.countError("first_settings", http2ConnectionError(http2ErrCodeProtocol)) } sc.sawFirstSettings = true } @@ -5089,7 +5152,7 @@ func (sc *http2serverConn) processFrame(f http2Frame) error { case *http2PushPromiseFrame: // A client cannot push. Thus, servers MUST treat the receipt of a PUSH_PROMISE // frame as a connection error (Section 5.4.1) of type PROTOCOL_ERROR. - return http2ConnectionError(http2ErrCodeProtocol) + return sc.countError("push_promise", http2ConnectionError(http2ErrCodeProtocol)) default: sc.vlogf("http2: server ignoring frame: %v", f.Header()) return nil @@ -5109,7 +5172,7 @@ func (sc *http2serverConn) processPing(f *http2PingFrame) error { // identifier field value other than 0x0, the recipient MUST // respond with a connection error (Section 5.4.1) of type // PROTOCOL_ERROR." - return http2ConnectionError(http2ErrCodeProtocol) + return sc.countError("ping_on_stream", http2ConnectionError(http2ErrCodeProtocol)) } if sc.inGoAway && sc.goAwayCode != http2ErrCodeNo { return nil @@ -5128,7 +5191,7 @@ func (sc *http2serverConn) processWindowUpdate(f *http2WindowUpdateFrame) error // or PRIORITY on a stream in this state MUST be // treated as a connection error (Section 5.4.1) of // type PROTOCOL_ERROR." - return http2ConnectionError(http2ErrCodeProtocol) + return sc.countError("stream_idle", http2ConnectionError(http2ErrCodeProtocol)) } if st == nil { // "WINDOW_UPDATE can be sent by a peer that has sent a @@ -5139,7 +5202,7 @@ func (sc *http2serverConn) processWindowUpdate(f *http2WindowUpdateFrame) error return nil } if !st.flow.add(int32(f.Increment)) { - return http2streamError(f.StreamID, http2ErrCodeFlowControl) + return sc.countError("bad_flow", http2streamError(f.StreamID, http2ErrCodeFlowControl)) } default: // connection-level flow control if !sc.flow.add(int32(f.Increment)) { @@ -5160,7 +5223,7 @@ func (sc *http2serverConn) processResetStream(f *http2RSTStreamFrame) error { // identifying an idle stream is received, the // recipient MUST treat this as a connection error // (Section 5.4.1) of type PROTOCOL_ERROR. - return http2ConnectionError(http2ErrCodeProtocol) + return sc.countError("reset_idle_stream", http2ConnectionError(http2ErrCodeProtocol)) } if st != nil { st.cancelCtx() @@ -5212,7 +5275,7 @@ func (sc *http2serverConn) processSettings(f *http2SettingsFrame) error { // Why is the peer ACKing settings we never sent? // The spec doesn't mention this case, but // hang up on them anyway. - return http2ConnectionError(http2ErrCodeProtocol) + return sc.countError("ack_mystery", http2ConnectionError(http2ErrCodeProtocol)) } return nil } @@ -5220,7 +5283,7 @@ func (sc *http2serverConn) processSettings(f *http2SettingsFrame) error { // This isn't actually in the spec, but hang up on // suspiciously large settings frames or those with // duplicate entries. - return http2ConnectionError(http2ErrCodeProtocol) + return sc.countError("settings_big_or_dups", http2ConnectionError(http2ErrCodeProtocol)) } if err := f.ForeachSetting(sc.processSetting); err != nil { return err @@ -5287,7 +5350,7 @@ func (sc *http2serverConn) processSettingInitialWindowSize(val uint32) error { // control window to exceed the maximum size as a // connection error (Section 5.4.1) of type // FLOW_CONTROL_ERROR." - return http2ConnectionError(http2ErrCodeFlowControl) + return sc.countError("setting_win_size", http2ConnectionError(http2ErrCodeFlowControl)) } } return nil @@ -5320,7 +5383,7 @@ func (sc *http2serverConn) processData(f *http2DataFrame) error { // or PRIORITY on a stream in this state MUST be // treated as a connection error (Section 5.4.1) of // type PROTOCOL_ERROR." - return http2ConnectionError(http2ErrCodeProtocol) + return sc.countError("data_on_idle", http2ConnectionError(http2ErrCodeProtocol)) } // "If a DATA frame is received whose stream is not in "open" @@ -5337,7 +5400,7 @@ func (sc *http2serverConn) processData(f *http2DataFrame) error { // and return any flow control bytes since we're not going // to consume them. if sc.inflow.available() < int32(f.Length) { - return http2streamError(id, http2ErrCodeFlowControl) + return sc.countError("data_flow", http2streamError(id, http2ErrCodeFlowControl)) } // Deduct the flow control from inflow, since we're // going to immediately add it back in @@ -5350,7 +5413,7 @@ func (sc *http2serverConn) processData(f *http2DataFrame) error { // Already have a stream error in flight. Don't send another. return nil } - return http2streamError(id, http2ErrCodeStreamClosed) + return sc.countError("closed", http2streamError(id, http2ErrCodeStreamClosed)) } if st.body == nil { panic("internal error: should have a body in this state") @@ -5362,12 +5425,12 @@ func (sc *http2serverConn) processData(f *http2DataFrame) error { // RFC 7540, sec 8.1.2.6: A request or response is also malformed if the // value of a content-length header field does not equal the sum of the // DATA frame payload lengths that form the body. - return http2streamError(id, http2ErrCodeProtocol) + return sc.countError("send_too_much", http2streamError(id, http2ErrCodeProtocol)) } if f.Length > 0 { // Check whether the client has flow control quota. if st.inflow.available() < int32(f.Length) { - return http2streamError(id, http2ErrCodeFlowControl) + return sc.countError("flow_on_data_length", http2streamError(id, http2ErrCodeFlowControl)) } st.inflow.take(int32(f.Length)) @@ -5375,7 +5438,7 @@ func (sc *http2serverConn) processData(f *http2DataFrame) error { wrote, err := st.body.Write(data) if err != nil { sc.sendWindowUpdate(nil, int(f.Length)-wrote) - return http2streamError(id, http2ErrCodeStreamClosed) + return sc.countError("body_write_err", http2streamError(id, http2ErrCodeStreamClosed)) } if wrote != len(data) { panic("internal error: bad Writer") @@ -5461,7 +5524,7 @@ func (sc *http2serverConn) processHeaders(f *http2MetaHeadersFrame) error { // stream identifier MUST respond with a connection error // (Section 5.4.1) of type PROTOCOL_ERROR. if id%2 != 1 { - return http2ConnectionError(http2ErrCodeProtocol) + return sc.countError("headers_even", http2ConnectionError(http2ErrCodeProtocol)) } // A HEADERS frame can be used to create a new stream or // send a trailer for an open one. If we already have a stream @@ -5478,7 +5541,7 @@ func (sc *http2serverConn) processHeaders(f *http2MetaHeadersFrame) error { // this state, it MUST respond with a stream error (Section 5.4.2) of // type STREAM_CLOSED. if st.state == http2stateHalfClosedRemote { - return http2streamError(id, http2ErrCodeStreamClosed) + return sc.countError("headers_half_closed", http2streamError(id, http2ErrCodeStreamClosed)) } return st.processTrailerHeaders(f) } @@ -5489,7 +5552,7 @@ func (sc *http2serverConn) processHeaders(f *http2MetaHeadersFrame) error { // receives an unexpected stream identifier MUST respond with // a connection error (Section 5.4.1) of type PROTOCOL_ERROR. if id <= sc.maxClientStreamID { - return http2ConnectionError(http2ErrCodeProtocol) + return sc.countError("stream_went_down", http2ConnectionError(http2ErrCodeProtocol)) } sc.maxClientStreamID = id @@ -5506,14 +5569,14 @@ func (sc *http2serverConn) processHeaders(f *http2MetaHeadersFrame) error { if sc.curClientStreams+1 > sc.advMaxStreams { if sc.unackedSettings == 0 { // They should know better. - return http2streamError(id, http2ErrCodeProtocol) + return sc.countError("over_max_streams", http2streamError(id, http2ErrCodeProtocol)) } // Assume it's a network race, where they just haven't // received our last SETTINGS update. But actually // this can't happen yet, because we don't yet provide // a way for users to adjust server parameters at // runtime. - return http2streamError(id, http2ErrCodeRefusedStream) + return sc.countError("over_max_streams_race", http2streamError(id, http2ErrCodeRefusedStream)) } initialState := http2stateOpen @@ -5523,7 +5586,7 @@ func (sc *http2serverConn) processHeaders(f *http2MetaHeadersFrame) error { st := sc.newStream(id, 0, initialState) if f.HasPriority() { - if err := http2checkPriority(f.StreamID, f.Priority); err != nil { + if err := sc.checkPriority(f.StreamID, f.Priority); err != nil { return err } sc.writeSched.AdjustStream(st.id, f.Priority) @@ -5567,15 +5630,15 @@ func (st *http2stream) processTrailerHeaders(f *http2MetaHeadersFrame) error { sc := st.sc sc.serveG.check() if st.gotTrailerHeader { - return http2ConnectionError(http2ErrCodeProtocol) + return sc.countError("dup_trailers", http2ConnectionError(http2ErrCodeProtocol)) } st.gotTrailerHeader = true if !f.StreamEnded() { - return http2streamError(st.id, http2ErrCodeProtocol) + return sc.countError("trailers_not_ended", http2streamError(st.id, http2ErrCodeProtocol)) } if len(f.PseudoFields()) > 0 { - return http2streamError(st.id, http2ErrCodeProtocol) + return sc.countError("trailers_pseudo", http2streamError(st.id, http2ErrCodeProtocol)) } if st.trailer != nil { for _, hf := range f.RegularFields() { @@ -5584,7 +5647,7 @@ func (st *http2stream) processTrailerHeaders(f *http2MetaHeadersFrame) error { // TODO: send more details to the peer somehow. But http2 has // no way to send debug data at a stream level. Discuss with // HTTP folk. - return http2streamError(st.id, http2ErrCodeProtocol) + return sc.countError("trailers_bogus", http2streamError(st.id, http2ErrCodeProtocol)) } st.trailer[key] = append(st.trailer[key], hf.Value) } @@ -5593,13 +5656,13 @@ func (st *http2stream) processTrailerHeaders(f *http2MetaHeadersFrame) error { return nil } -func http2checkPriority(streamID uint32, p http2PriorityParam) error { +func (sc *http2serverConn) checkPriority(streamID uint32, p http2PriorityParam) error { if streamID == p.StreamDep { // Section 5.3.1: "A stream cannot depend on itself. An endpoint MUST treat // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR." // Section 5.3.3 says that a stream can depend on one of its dependencies, // so it's only self-dependencies that are forbidden. - return http2streamError(streamID, http2ErrCodeProtocol) + return sc.countError("priority", http2streamError(streamID, http2ErrCodeProtocol)) } return nil } @@ -5608,7 +5671,7 @@ func (sc *http2serverConn) processPriority(f *http2PriorityFrame) error { if sc.inGoAway { return nil } - if err := http2checkPriority(f.StreamID, f.http2PriorityParam); err != nil { + if err := sc.checkPriority(f.StreamID, f.http2PriorityParam); err != nil { return err } sc.writeSched.AdjustStream(f.StreamID, f.http2PriorityParam) @@ -5665,7 +5728,7 @@ func (sc *http2serverConn) newWriterAndRequest(st *http2stream, f *http2MetaHead isConnect := rp.method == "CONNECT" if isConnect { if rp.path != "" || rp.scheme != "" || rp.authority == "" { - return nil, nil, http2streamError(f.StreamID, http2ErrCodeProtocol) + return nil, nil, sc.countError("bad_connect", http2streamError(f.StreamID, http2ErrCodeProtocol)) } } else if rp.method == "" || rp.path == "" || (rp.scheme != "https" && rp.scheme != "http") { // See 8.1.2.6 Malformed Requests and Responses: @@ -5678,13 +5741,13 @@ func (sc *http2serverConn) newWriterAndRequest(st *http2stream, f *http2MetaHead // "All HTTP/2 requests MUST include exactly one valid // value for the :method, :scheme, and :path // pseudo-header fields" - return nil, nil, http2streamError(f.StreamID, http2ErrCodeProtocol) + return nil, nil, sc.countError("bad_path_method", http2streamError(f.StreamID, http2ErrCodeProtocol)) } bodyOpen := !f.StreamEnded() if rp.method == "HEAD" && bodyOpen { // HEAD requests can't have bodies - return nil, nil, http2streamError(f.StreamID, http2ErrCodeProtocol) + return nil, nil, sc.countError("head_body", http2streamError(f.StreamID, http2ErrCodeProtocol)) } rp.header = make(Header) @@ -5767,7 +5830,7 @@ func (sc *http2serverConn) newWriterAndRequestNoBody(st *http2stream, rp http2re var err error url_, err = url.ParseRequestURI(rp.path) if err != nil { - return nil, nil, http2streamError(st.id, http2ErrCodeProtocol) + return nil, nil, sc.countError("bad_path", http2streamError(st.id, http2ErrCodeProtocol)) } requestURI = rp.path } @@ -6651,6 +6714,34 @@ func http2h1ServerKeepAlivesDisabled(hs *Server) bool { return false } +func (sc *http2serverConn) countError(name string, err error) error { + if sc == nil || sc.srv == nil { + return err + } + f := sc.srv.CountError + if f == nil { + return err + } + var typ string + var code http2ErrCode + switch e := err.(type) { + case http2ConnectionError: + typ = "conn" + code = http2ErrCode(e) + case http2StreamError: + typ = "stream" + code = http2ErrCode(e.Code) + default: + return err + } + codeStr := http2errCodeName[code] + if codeStr == "" { + codeStr = strconv.Itoa(int(code)) + } + f(fmt.Sprintf("%s_%s_%s", typ, codeStr, name)) + return err +} + const ( // transportDefaultConnFlow is how many connection-level flow control // tokens we give the server at start-up, past the default 64k. @@ -6666,6 +6757,15 @@ const ( http2transportDefaultStreamMinRefresh = 4 << 10 http2defaultUserAgent = "Go-http-client/2.0" + + // initialMaxConcurrentStreams is a connections maxConcurrentStreams until + // it's received servers initial SETTINGS frame, which corresponds with the + // spec's minimum recommended value. + http2initialMaxConcurrentStreams = 100 + + // defaultMaxConcurrentStreams is a connections default maxConcurrentStreams + // if the server doesn't include one in its initial SETTINGS frame. + http2defaultMaxConcurrentStreams = 1000 ) // Transport is an HTTP/2 Transport. @@ -6736,6 +6836,12 @@ type http2Transport struct { // Defaults to 15s. PingTimeout time.Duration + // CountError, if non-nil, is called on HTTP/2 transport errors. + // It's intended to increment a metric for monitoring, such + // as an expvar or Prometheus metric. + // The errType consists of only ASCII word characters. + CountError func(errType string) + // t1, if non-nil, is the standard library Transport using // this transport. Its settings are used (but not its // RoundTrip method, etc). @@ -6842,11 +6948,12 @@ func (t *http2Transport) initConnPool() { // ClientConn is the state of a single HTTP/2 client connection to an // HTTP/2 server. type http2ClientConn struct { - t *http2Transport - tconn net.Conn // usually *tls.Conn, except specialized impls - tlsState *tls.ConnectionState // nil only for specialized impls - reused uint32 // whether conn is being reused; atomic - singleUse bool // whether being used for a single http.Request + t *http2Transport + tconn net.Conn // usually *tls.Conn, except specialized impls + tlsState *tls.ConnectionState // nil only for specialized impls + reused uint32 // whether conn is being reused; atomic + singleUse bool // whether being used for a single http.Request + getConnCalled bool // used by clientConnPool // readLoop goroutine fields: readerDone chan struct{} // closed on error @@ -6859,31 +6966,41 @@ type http2ClientConn struct { cond *sync.Cond // hold mu; broadcast on flow/closed changes flow http2flow // our conn-level flow control quota (cs.flow is per stream) inflow http2flow // peer's conn-level flow control + doNotReuse bool // whether conn is marked to not be reused for any future requests closing bool closed bool + seenSettings bool // true if we've seen a settings frame, false otherwise wantSettingsAck bool // we sent a SETTINGS frame and haven't heard back goAway *http2GoAwayFrame // if non-nil, the GoAwayFrame we received goAwayDebug string // goAway frame's debug data, retained as a string streams map[uint32]*http2clientStream // client-initiated + streamsReserved int // incr by ReserveNewRequest; decr on RoundTrip nextStreamID uint32 pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams pings map[[8]byte]chan struct{} // in flight ping data to notification channel - bw *bufio.Writer br *bufio.Reader - fr *http2Framer lastActive time.Time lastIdle time.Time // time last idle - // Settings from peer: (also guarded by mu) + // Settings from peer: (also guarded by wmu) maxFrameSize uint32 maxConcurrentStreams uint32 peerMaxHeaderListSize uint64 initialWindowSize uint32 + // reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests. + // Write to reqHeaderMu to lock it, read from it to unlock. + // Lock reqmu BEFORE mu or wmu. + reqHeaderMu chan struct{} + + // wmu is held while writing. + // Acquire BEFORE mu when holding both, to avoid blocking mu on network writes. + // Only acquire both at the same time when changing peer settings. + wmu sync.Mutex + bw *bufio.Writer + fr *http2Framer + werr error // first write error that has occurred hbuf bytes.Buffer // HPACK encoder writes into this henc *hpack.Encoder - - wmu sync.Mutex // held while writing; acquire AFTER mu if holding both - werr error // first write error that has occurred } // clientStream is the state for a single HTTP/2 stream. One of these @@ -6893,52 +7010,42 @@ type http2clientStream struct { req *Request trace *httptrace.ClientTrace // or nil ID uint32 - resc chan http2resAndError bufPipe http2pipe // buffered pipe with the flow-controlled response payload - startedWrite bool // started request body write; guarded by cc.mu requestedGzip bool - on100 func() // optional code to run if get a 100 continue response + + abortOnce sync.Once + abort chan struct{} // closed to signal stream should end immediately + abortErr error // set if abort is closed + + peerClosed chan struct{} // closed when the peer sends an END_STREAM flag + donec chan struct{} // closed after the stream is in the closed state + on100 chan struct{} // buffered; written to if a 100 is received + + respHeaderRecv chan struct{} // closed when headers are received + res *Response // set if respHeaderRecv is closed flow http2flow // guarded by cc.mu inflow http2flow // guarded by cc.mu bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read readErr error // sticky read error; owned by transportResponseBody.Read stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu - didReset bool // whether we sent a RST_STREAM to the server; guarded by cc.mu - peerReset chan struct{} // closed on peer reset - resetErr error // populated before peerReset is closed - - done chan struct{} // closed when stream remove from cc.streams map; close calls guarded by cc.mu + // owned by writeRequest: + sentEndStream bool // sent an END_STREAM flag to the peer + sentHeaders bool // owned by clientConnReadLoop: firstByte bool // got the first response byte pastHeaders bool // got first MetaHeadersFrame (actual headers) pastTrailers bool // got optional second MetaHeadersFrame (trailers) num1xx uint8 // number of 1xx responses seen + readClosed bool // peer sent an END_STREAM flag + readAborted bool // read loop reset the stream trailer Header // accumulated trailers resTrailer *Header // client's Response.Trailer } -// awaitRequestCancel waits for the user to cancel a request or for the done -// channel to be signaled. A non-nil error is returned only if the request was -// canceled. -func http2awaitRequestCancel(req *Request, done <-chan struct{}) error { - ctx := req.Context() - if req.Cancel == nil && ctx.Done() == nil { - return nil - } - select { - case <-req.Cancel: - return http2errRequestCanceled - case <-ctx.Done(): - return ctx.Err() - case <-done: - return nil - } -} - var http2got1xxFuncForTests func(int, textproto.MIMEHeader) error // get1xxTraceFunc returns the value of request's httptrace.ClientTrace.Got1xxResponse func, @@ -6950,50 +7057,24 @@ func (cs *http2clientStream) get1xxTraceFunc() func(int, textproto.MIMEHeader) e return http2traceGot1xxResponseFunc(cs.trace) } -// awaitRequestCancel waits for the user to cancel a request, its context to -// expire, or for the request to be done (any way it might be removed from the -// cc.streams map: peer reset, successful completion, TCP connection breakage, -// etc). If the request is canceled, then cs will be canceled and closed. -func (cs *http2clientStream) awaitRequestCancel(req *Request) { - if err := http2awaitRequestCancel(req, cs.done); err != nil { - cs.cancelStream() - cs.bufPipe.CloseWithError(err) - } +func (cs *http2clientStream) abortStream(err error) { + cs.cc.mu.Lock() + defer cs.cc.mu.Unlock() + cs.abortStreamLocked(err) } -func (cs *http2clientStream) cancelStream() { - cc := cs.cc - cc.mu.Lock() - didReset := cs.didReset - cs.didReset = true - cc.mu.Unlock() - - if !didReset { - cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) - cc.forgetStreamID(cs.ID) - } -} - -// checkResetOrDone reports any error sent in a RST_STREAM frame by the -// server, or errStreamClosed if the stream is complete. -func (cs *http2clientStream) checkResetOrDone() error { - select { - case <-cs.peerReset: - return cs.resetErr - case <-cs.done: - return http2errStreamClosed - default: - return nil +func (cs *http2clientStream) abortStreamLocked(err error) { + cs.abortOnce.Do(func() { + cs.abortErr = err + close(cs.abort) + }) + // TODO(dneil): Clean up tests where cs.cc.cond is nil. + if cs.cc.cond != nil { + // Wake up writeRequestBody if it is waiting on flow control. + cs.cc.cond.Broadcast() } } -func (cs *http2clientStream) getStartedWrite() bool { - cc := cs.cc - cc.mu.Lock() - defer cc.mu.Unlock() - return cs.startedWrite -} - func (cs *http2clientStream) abortRequestBodyWrite(err error) { if err == nil { panic("nil error") @@ -7002,9 +7083,6 @@ func (cs *http2clientStream) abortRequestBodyWrite(err error) { cc.mu.Lock() if cs.stopReqBody == nil { cs.stopReqBody = err - if cs.req.Body != nil { - cs.req.Body.Close() - } cc.cond.Broadcast() } cc.mu.Unlock() @@ -7095,9 +7173,9 @@ func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Res } reused := !atomic.CompareAndSwapUint32(&cc.reused, 0, 1) http2traceGotConn(req, cc, reused) - res, gotErrAfterReqBodyWrite, err := cc.roundTrip(req) + res, err := cc.RoundTrip(req) if err != nil && retry <= 6 { - if req, err = http2shouldRetryRequest(req, err, gotErrAfterReqBodyWrite); err == nil { + if req, err = http2shouldRetryRequest(req, err); err == nil { // After the first retry, do exponential backoff with 10% jitter. if retry == 0 { continue @@ -7108,7 +7186,7 @@ func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Res case <-time.After(time.Second * time.Duration(backoff)): continue case <-req.Context().Done(): - return nil, req.Context().Err() + err = req.Context().Err() } } } @@ -7139,7 +7217,7 @@ var ( // response headers. It is always called with a non-nil error. // It returns either a request to retry (either the same request, or a // modified clone), or an error if the request can't be replayed. -func http2shouldRetryRequest(req *Request, err error, afterBodyWrite bool) (*Request, error) { +func http2shouldRetryRequest(req *Request, err error) (*Request, error) { if !http2canRetryError(err) { return nil, err } @@ -7152,7 +7230,6 @@ func http2shouldRetryRequest(req *Request, err error, afterBodyWrite bool) (*Req // If the request body can be reset back to its original // state via the optional req.GetBody, do that. if req.GetBody != nil { - // TODO: consider a req.Body.Close here? or audit that all caller paths do? body, err := req.GetBody() if err != nil { return nil, err @@ -7164,10 +7241,8 @@ func http2shouldRetryRequest(req *Request, err error, afterBodyWrite bool) (*Req // The Request.Body can't reset back to the beginning, but we // don't seem to have started to read from it yet, so reuse - // the request directly. The "afterBodyWrite" means the - // bodyWrite process has started, which becomes true before - // the first Read. - if !afterBodyWrite { + // the request directly. + if err == http2errClientConnUnusable { return req, nil } @@ -7179,6 +7254,10 @@ func http2canRetryError(err error) bool { return true } if se, ok := err.(http2StreamError); ok { + if se.Code == http2ErrCodeProtocol && se.Cause == http2errFromPeer { + // See golang/go#47635, golang/go#42777 + return true + } return se.Code == http2ErrCodeRefusedStream } return false @@ -7253,14 +7332,15 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2Client tconn: c, readerDone: make(chan struct{}), nextStreamID: 1, - maxFrameSize: 16 << 10, // spec default - initialWindowSize: 65535, // spec default - maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough. - peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead. + maxFrameSize: 16 << 10, // spec default + initialWindowSize: 65535, // spec default + maxConcurrentStreams: http2initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings. + peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead. streams: make(map[uint32]*http2clientStream), singleUse: singleUse, wantSettingsAck: true, pings: make(map[[8]byte]chan struct{}), + reqHeaderMu: make(chan struct{}, 1), } if d := t.idleConnTimeout(); d != 0 { cc.idleTimeout = d @@ -7278,6 +7358,9 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2Client cc.bw = bufio.NewWriter(http2stickyErrWriter{c, &cc.werr}) cc.br = bufio.NewReader(c) cc.fr = http2NewFramer(cc.bw, cc.br) + if t.CountError != nil { + cc.fr.countError = t.CountError + } cc.fr.ReadMetaHeaders = hpack.NewDecoder(http2initialHeaderTableSize, nil) cc.fr.MaxHeaderListSize = t.maxHeaderListSize() @@ -7330,6 +7413,13 @@ func (cc *http2ClientConn) healthCheck() { } } +// SetDoNotReuse marks cc as not reusable for future HTTP requests. +func (cc *http2ClientConn) SetDoNotReuse() { + cc.mu.Lock() + defer cc.mu.Unlock() + cc.doNotReuse = true +} + func (cc *http2ClientConn) setGoAway(f *http2GoAwayFrame) { cc.mu.Lock() defer cc.mu.Unlock() @@ -7347,27 +7437,39 @@ func (cc *http2ClientConn) setGoAway(f *http2GoAwayFrame) { last := f.LastStreamID for streamID, cs := range cc.streams { if streamID > last { - select { - case cs.resc <- http2resAndError{err: http2errClientConnGotGoAway}: - default: - } + cs.abortStreamLocked(http2errClientConnGotGoAway) } } } // CanTakeNewRequest reports whether the connection can take a new request, // meaning it has not been closed or received or sent a GOAWAY. +// +// If the caller is going to immediately make a new request on this +// connection, use ReserveNewRequest instead. func (cc *http2ClientConn) CanTakeNewRequest() bool { cc.mu.Lock() defer cc.mu.Unlock() return cc.canTakeNewRequestLocked() } +// ReserveNewRequest is like CanTakeNewRequest but also reserves a +// concurrent stream in cc. The reservation is decremented on the +// next call to RoundTrip. +func (cc *http2ClientConn) ReserveNewRequest() bool { + cc.mu.Lock() + defer cc.mu.Unlock() + if st := cc.idleStateLocked(); !st.canTakeNewRequest { + return false + } + cc.streamsReserved++ + return true +} + // clientConnIdleState describes the suitability of a client // connection to initiate a new RoundTrip request. type http2clientConnIdleState struct { canTakeNewRequest bool - freshConn bool // whether it's unused by any previous request } func (cc *http2ClientConn) idleState() http2clientConnIdleState { @@ -7388,13 +7490,13 @@ func (cc *http2ClientConn) idleStateLocked() (st http2clientConnIdleState) { // writing it. maxConcurrentOkay = true } else { - maxConcurrentOkay = int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) + maxConcurrentOkay = int64(len(cc.streams)+cc.streamsReserved+1) <= int64(cc.maxConcurrentStreams) } st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay && + !cc.doNotReuse && int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 && !cc.tooIdleLocked() - st.freshConn = cc.nextStreamID == 1 && st.canTakeNewRequest return } @@ -7425,7 +7527,7 @@ func (cc *http2ClientConn) onIdleTimeout() { func (cc *http2ClientConn) closeIfIdle() { cc.mu.Lock() - if len(cc.streams) > 0 { + if len(cc.streams) > 0 || cc.streamsReserved > 0 { cc.mu.Unlock() return } @@ -7440,9 +7542,15 @@ func (cc *http2ClientConn) closeIfIdle() { cc.tconn.Close() } +func (cc *http2ClientConn) isDoNotReuseAndIdle() bool { + cc.mu.Lock() + defer cc.mu.Unlock() + return cc.doNotReuse && len(cc.streams) == 0 +} + var http2shutdownEnterWaitStateHook = func() {} -// Shutdown gracefully close the client connection, waiting for running streams to complete. +// Shutdown gracefully closes the client connection, waiting for running streams to complete. func (cc *http2ClientConn) Shutdown(ctx context.Context) error { if err := cc.sendGoAway(); err != nil { return err @@ -7481,15 +7589,18 @@ func (cc *http2ClientConn) Shutdown(ctx context.Context) error { func (cc *http2ClientConn) sendGoAway() error { cc.mu.Lock() - defer cc.mu.Unlock() - cc.wmu.Lock() - defer cc.wmu.Unlock() - if cc.closing { + closing := cc.closing + cc.closing = true + maxStreamID := cc.nextStreamID + cc.mu.Unlock() + if closing { // GOAWAY sent already return nil } + + cc.wmu.Lock() + defer cc.wmu.Unlock() // Send a graceful shutdown frame to server - maxStreamID := cc.nextStreamID if err := cc.fr.WriteGoAway(maxStreamID, http2ErrCodeNo, nil); err != nil { return err } @@ -7497,7 +7608,6 @@ func (cc *http2ClientConn) sendGoAway() error { return err } // Prevent new requests - cc.closing = true return nil } @@ -7505,17 +7615,12 @@ func (cc *http2ClientConn) sendGoAway() error { // err is sent to streams. func (cc *http2ClientConn) closeForError(err error) error { cc.mu.Lock() + cc.closed = true + for _, cs := range cc.streams { + cs.abortStreamLocked(err) + } defer cc.cond.Broadcast() defer cc.mu.Unlock() - for id, cs := range cc.streams { - select { - case cs.resc <- http2resAndError{err: err}: - default: - } - cs.bufPipe.CloseWithError(err) - delete(cc.streams, id) - } - cc.closed = true return cc.tconn.Close() } @@ -7530,6 +7635,9 @@ func (cc *http2ClientConn) Close() error { // closes the client connection immediately. In-flight requests are interrupted. func (cc *http2ClientConn) closeForLostPing() error { err := errors.New("http2: client connection lost") + if f := cc.t.CountError; f != nil { + f("conn_close_lost_ping") + } return cc.closeForError(err) } @@ -7594,37 +7702,132 @@ func http2actualContentLength(req *Request) int64 { return -1 } -func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { - resp, _, err := cc.roundTrip(req) - return resp, err +func (cc *http2ClientConn) decrStreamReservations() { + cc.mu.Lock() + defer cc.mu.Unlock() + cc.decrStreamReservationsLocked() } -func (cc *http2ClientConn) roundTrip(req *Request) (res *Response, gotErrAfterReqBodyWrite bool, err error) { - if err := http2checkConnHeaders(req); err != nil { - return nil, false, err +func (cc *http2ClientConn) decrStreamReservationsLocked() { + if cc.streamsReserved > 0 { + cc.streamsReserved-- } - if cc.idleTimer != nil { - cc.idleTimer.Stop() +} + +func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { + ctx := req.Context() + cs := &http2clientStream{ + cc: cc, + req: req, + trace: httptrace.ContextClientTrace(req.Context()), + peerClosed: make(chan struct{}), + abort: make(chan struct{}), + respHeaderRecv: make(chan struct{}), + donec: make(chan struct{}), } + go cs.doRequest() - trailers, err := http2commaSeparatedTrailers(req) - if err != nil { - return nil, false, err + waitDone := func() error { + select { + case <-cs.donec: + return nil + case <-ctx.Done(): + return ctx.Err() + case <-req.Cancel: + return http2errRequestCanceled + } + } + + for { + select { + case <-cs.respHeaderRecv: + res := cs.res + if res.StatusCode > 299 { + // On error or status code 3xx, 4xx, 5xx, etc abort any + // ongoing write, assuming that the server doesn't care + // about our request body. If the server replied with 1xx or + // 2xx, however, then assume the server DOES potentially + // want our body (e.g. full-duplex streaming: + // golang.org/issue/13444). If it turns out the server + // doesn't, they'll RST_STREAM us soon enough. This is a + // heuristic to avoid adding knobs to Transport. Hopefully + // we can keep it. + cs.abortRequestBodyWrite(http2errStopReqBodyWrite) + } + res.Request = req + res.TLS = cc.tlsState + if res.Body == http2noBody && http2actualContentLength(req) == 0 { + // If there isn't a request or response body still being + // written, then wait for the stream to be closed before + // RoundTrip returns. + if err := waitDone(); err != nil { + return nil, err + } + } + return res, nil + case <-cs.abort: + waitDone() + return nil, cs.abortErr + case <-ctx.Done(): + return nil, ctx.Err() + case <-req.Cancel: + return nil, http2errRequestCanceled + } + } +} + +// doRequest runs for the duration of the request lifetime. +// +// It sends the request and performs post-request cleanup (closing Request.Body, etc.). +func (cs *http2clientStream) doRequest() { + err := cs.writeRequest() + cs.cleanupWriteRequest(err) +} + +// writeRequest sends a request. +// +// It returns nil after the request is written, the response read, +// and the request stream is half-closed by the peer. +// +// It returns non-nil if the request ends otherwise. +// If the returned error is StreamError, the error Code may be used in resetting the stream. +func (cs *http2clientStream) writeRequest() (err error) { + cc := cs.cc + req := cs.req + ctx := req.Context() + + if err := http2checkConnHeaders(cs.req); err != nil { + return err + } + + // Acquire the new-request lock by writing to reqHeaderMu. + // This lock guards the critical section covering allocating a new stream ID + // (requires mu) and creating the stream (requires wmu). + if cc.reqHeaderMu == nil { + panic("RoundTrip on uninitialized ClientConn") // for tests + } + select { + case cc.reqHeaderMu <- struct{}{}: + case <-req.Cancel: + return http2errRequestCanceled + case <-ctx.Done(): + return ctx.Err() } - hasTrailers := trailers != "" cc.mu.Lock() - if err := cc.awaitOpenSlotForRequest(req); err != nil { + if cc.idleTimer != nil { + cc.idleTimer.Stop() + } + cc.decrStreamReservationsLocked() + if err := cc.awaitOpenSlotForStreamLocked(cs); err != nil { cc.mu.Unlock() - return nil, false, err + <-cc.reqHeaderMu + return err } - - body := req.Body - contentLen := http2actualContentLength(req) - hasBody := contentLen != 0 + cc.addStreamLocked(cs) // assigns stream ID + cc.mu.Unlock() // TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere? - var requestedGzip bool if !cc.t.disableCompression() && req.Header.Get("Accept-Encoding") == "" && req.Header.Get("Range") == "" && @@ -7641,183 +7844,218 @@ func (cc *http2ClientConn) roundTrip(req *Request) (res *Response, gotErrAfterRe // We don't request gzip if the request is for a range, since // auto-decoding a portion of a gzipped document will just fail // anyway. See https://golang.org/issue/8923 - requestedGzip = true + cs.requestedGzip = true } - // we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is - // sent by writeRequestBody below, along with any Trailers, - // again in form HEADERS{1}, CONTINUATION{0,}) - hdrs, err := cc.encodeHeaders(req, requestedGzip, trailers, contentLen) - if err != nil { - cc.mu.Unlock() - return nil, false, err + continueTimeout := cc.t.expectContinueTimeout() + if continueTimeout != 0 && + !httpguts.HeaderValuesContainsToken( + cs.req.Header["Expect"], + "100-continue") { + continueTimeout = 0 + cs.on100 = make(chan struct{}, 1) } - cs := cc.newStream() - cs.req = req - cs.trace = httptrace.ContextClientTrace(req.Context()) - cs.requestedGzip = requestedGzip - bodyWriter := cc.t.getBodyWriterState(cs, body) - cs.on100 = bodyWriter.on100 + err = cs.encodeAndWriteHeaders() + <-cc.reqHeaderMu + if err != nil { + return err + } - defer func() { - cc.wmu.Lock() - werr := cc.werr - cc.wmu.Unlock() - if werr != nil { - cc.Close() + hasBody := http2actualContentLength(cs.req) != 0 + if !hasBody { + cs.sentEndStream = true + } else { + if continueTimeout != 0 { + http2traceWait100Continue(cs.trace) + timer := time.NewTimer(continueTimeout) + select { + case <-timer.C: + err = nil + case <-cs.on100: + err = nil + case <-cs.abort: + err = cs.abortErr + case <-ctx.Done(): + err = ctx.Err() + case <-req.Cancel: + err = http2errRequestCanceled + } + timer.Stop() + if err != nil { + http2traceWroteRequest(cs.trace, err) + return err + } } - }() - - cc.wmu.Lock() - endStream := !hasBody && !hasTrailers - werr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs) - cc.wmu.Unlock() - http2traceWroteHeaders(cs.trace) - cc.mu.Unlock() - if werr != nil { - if hasBody { - req.Body.Close() // per RoundTripper contract - bodyWriter.cancel() + if err = cs.writeRequestBody(req.Body); err != nil { + if err != http2errStopReqBodyWrite { + http2traceWroteRequest(cs.trace, err) + return err + } + } else { + cs.sentEndStream = true } - cc.forgetStreamID(cs.ID) - // Don't bother sending a RST_STREAM (our write already failed; - // no need to keep writing) - http2traceWroteRequest(cs.trace, werr) - return nil, false, werr } + http2traceWroteRequest(cs.trace, err) + var respHeaderTimer <-chan time.Time - if hasBody { - bodyWriter.scheduleBodyWrite() - } else { - http2traceWroteRequest(cs.trace, nil) - if d := cc.responseHeaderTimeout(); d != 0 { - timer := time.NewTimer(d) - defer timer.Stop() - respHeaderTimer = timer.C + var respHeaderRecv chan struct{} + if d := cc.responseHeaderTimeout(); d != 0 { + timer := time.NewTimer(d) + defer timer.Stop() + respHeaderTimer = timer.C + respHeaderRecv = cs.respHeaderRecv + } + // Wait until the peer half-closes its end of the stream, + // or until the request is aborted (via context, error, or otherwise), + // whichever comes first. + for { + select { + case <-cs.peerClosed: + return nil + case <-respHeaderTimer: + return http2errTimeout + case <-respHeaderRecv: + respHeaderTimer = nil // keep waiting for END_STREAM + case <-cs.abort: + return cs.abortErr + case <-ctx.Done(): + return ctx.Err() + case <-req.Cancel: + return http2errRequestCanceled } } +} - readLoopResCh := cs.resc - bodyWritten := false +func (cs *http2clientStream) encodeAndWriteHeaders() error { + cc := cs.cc + req := cs.req ctx := req.Context() - handleReadLoopResponse := func(re http2resAndError) (*Response, bool, error) { - res := re.res - if re.err != nil || res.StatusCode > 299 { - // On error or status code 3xx, 4xx, 5xx, etc abort any - // ongoing write, assuming that the server doesn't care - // about our request body. If the server replied with 1xx or - // 2xx, however, then assume the server DOES potentially - // want our body (e.g. full-duplex streaming: - // golang.org/issue/13444). If it turns out the server - // doesn't, they'll RST_STREAM us soon enough. This is a - // heuristic to avoid adding knobs to Transport. Hopefully - // we can keep it. - bodyWriter.cancel() - cs.abortRequestBodyWrite(http2errStopReqBodyWrite) - if hasBody && !bodyWritten { - <-bodyWriter.resc - } - } - if re.err != nil { - cc.forgetStreamID(cs.ID) - return nil, cs.getStartedWrite(), re.err - } - res.Request = req - res.TLS = cc.tlsState - return res, false, nil + cc.wmu.Lock() + defer cc.wmu.Unlock() + + // If the request was canceled while waiting for cc.mu, just quit. + select { + case <-cs.abort: + return cs.abortErr + case <-ctx.Done(): + return ctx.Err() + case <-req.Cancel: + return http2errRequestCanceled + default: } - handleError := func(err error) (*Response, bool, error) { - if !hasBody || bodyWritten { - cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) - } else { - bodyWriter.cancel() - cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel) - <-bodyWriter.resc + // Encode headers. + // + // we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is + // sent by writeRequestBody below, along with any Trailers, + // again in form HEADERS{1}, CONTINUATION{0,}) + trailers, err := http2commaSeparatedTrailers(cs.req) + if err != nil { + return err + } + hasTrailers := trailers != "" + contentLen := http2actualContentLength(cs.req) + hasBody := contentLen != 0 + hdrs, err := cc.encodeHeaders(cs.req, cs.requestedGzip, trailers, contentLen) + if err != nil { + return err + } + + // Write the request. + endStream := !hasBody && !hasTrailers + cs.sentHeaders = true + err = cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs) + http2traceWroteHeaders(cs.trace) + return err +} + +// cleanupWriteRequest performs post-request tasks. +// +// If err (the result of writeRequest) is non-nil and the stream is not closed, +// cleanupWriteRequest will send a reset to the peer. +func (cs *http2clientStream) cleanupWriteRequest(err error) { + cc := cs.cc + req := cs.req + + if cs.ID == 0 { + // We were canceled before creating the stream, so return our reservation. + cc.decrStreamReservations() + } + + // TODO: write h12Compare test showing whether + // Request.Body is closed by the Transport, + // and in multiple cases: server replies <=299 and >299 + // while still writing request body + if req.Body != nil { + if e := req.Body.Close(); err == nil { + err = e } - cc.forgetStreamID(cs.ID) - return nil, cs.getStartedWrite(), err } - for { + if err != nil && cs.sentEndStream { + // If the connection is closed immediately after the response is read, + // we may be aborted before finishing up here. If the stream was closed + // cleanly on both sides, there is no error. select { - case re := <-readLoopResCh: - return handleReadLoopResponse(re) - case <-respHeaderTimer: - return handleError(http2errTimeout) - case <-ctx.Done(): - return handleError(ctx.Err()) - case <-req.Cancel: - return handleError(http2errRequestCanceled) - case <-cs.peerReset: - // processResetStream already removed the - // stream from the streams map; no need for - // forgetStreamID. - return nil, cs.getStartedWrite(), cs.resetErr - case err := <-bodyWriter.resc: - bodyWritten = true - // Prefer the read loop's response, if available. Issue 16102. - select { - case re := <-readLoopResCh: - return handleReadLoopResponse(re) - default: - } - if err != nil { - cc.forgetStreamID(cs.ID) - return nil, cs.getStartedWrite(), err - } - if d := cc.responseHeaderTimeout(); d != 0 { - timer := time.NewTimer(d) - defer timer.Stop() - respHeaderTimer = timer.C + case <-cs.peerClosed: + err = nil + default: + } + } + if err != nil { + cs.abortStream(err) // possibly redundant, but harmless + if cs.sentHeaders { + if se, ok := err.(http2StreamError); ok { + if se.Cause != http2errFromPeer { + cc.writeStreamReset(cs.ID, se.Code, err) + } + } else { + cc.writeStreamReset(cs.ID, http2ErrCodeCancel, err) } } + cs.bufPipe.CloseWithError(err) // no-op if already closed + } else { + if cs.sentHeaders && !cs.sentEndStream { + cc.writeStreamReset(cs.ID, http2ErrCodeNo, nil) + } + cs.bufPipe.CloseWithError(http2errRequestCanceled) + } + if cs.ID != 0 { + cc.forgetStreamID(cs.ID) + } + close(cs.donec) + + cc.wmu.Lock() + werr := cc.werr + cc.wmu.Unlock() + if werr != nil { + cc.Close() } } -// awaitOpenSlotForRequest waits until len(streams) < maxConcurrentStreams. +// awaitOpenSlotForStream waits until len(streams) < maxConcurrentStreams. // Must hold cc.mu. -func (cc *http2ClientConn) awaitOpenSlotForRequest(req *Request) error { - var waitingForConn chan struct{} - var waitingForConnErr error // guarded by cc.mu +func (cc *http2ClientConn) awaitOpenSlotForStreamLocked(cs *http2clientStream) error { for { cc.lastActive = time.Now() if cc.closed || !cc.canTakeNewRequestLocked() { - if waitingForConn != nil { - close(waitingForConn) - } return http2errClientConnUnusable } cc.lastIdle = time.Time{} - if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) { - if waitingForConn != nil { - close(waitingForConn) - } + if int64(len(cc.streams)) < int64(cc.maxConcurrentStreams) { return nil } - // Unfortunately, we cannot wait on a condition variable and channel at - // the same time, so instead, we spin up a goroutine to check if the - // request is canceled while we wait for a slot to open in the connection. - if waitingForConn == nil { - waitingForConn = make(chan struct{}) - go func() { - if err := http2awaitRequestCancel(req, waitingForConn); err != nil { - cc.mu.Lock() - waitingForConnErr = err - cc.cond.Broadcast() - cc.mu.Unlock() - } - }() - } cc.pendingRequests++ cc.cond.Wait() cc.pendingRequests-- - if waitingForConnErr != nil { - return waitingForConnErr + select { + case <-cs.abort: + return cs.abortErr + default: } } } @@ -7844,10 +8082,6 @@ func (cc *http2ClientConn) writeHeaders(streamID uint32, endStream bool, maxFram cc.fr.WriteContinuation(streamID, endHeaders, chunk) } } - // TODO(bradfitz): this Flush could potentially block (as - // could the WriteHeaders call(s) above), which means they - // wouldn't respond to Request.Cancel being readable. That's - // rare, but this should probably be in a goroutine. cc.bw.Flush() return cc.werr } @@ -7889,28 +8123,10 @@ func (cs *http2clientStream) frameScratchBufferLen(maxFrameSize int) int { var http2bufPool sync.Pool // of *[]byte -func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) { +func (cs *http2clientStream) writeRequestBody(body io.Reader) (err error) { cc := cs.cc sentEnd := false // whether we sent the final DATA frame w/ END_STREAM - defer func() { - http2traceWroteRequest(cs.trace, err) - // TODO: write h12Compare test showing whether - // Request.Body is closed by the Transport, - // and in multiple cases: server replies <=299 and >299 - // while still writing request body - var cerr error - cc.mu.Lock() - if cs.stopReqBody == nil { - cs.stopReqBody = http2errStopReqBodyWrite - cerr = bodyCloser.Close() - } - cc.mu.Unlock() - if err == nil { - err = cerr - } - }() - req := cs.req hasTrailers := req.Trailer != nil remainLen := http2actualContentLength(req) @@ -7951,7 +8167,6 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Clos } if remainLen < 0 { err = http2errReqBodyTooLong - cc.writeStreamReset(cs.ID, http2ErrCodeCancel, err) return err } } @@ -7959,7 +8174,6 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Clos sawEOF = true err = nil } else if err != nil { - cc.writeStreamReset(cs.ID, http2ErrCodeCancel, err) return err } @@ -7971,7 +8185,6 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Clos case err == http2errStopReqBodyWrite: return err case err == http2errStopReqBodyWriteAndCancel: - cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) return err case err != nil: return err @@ -8004,19 +8217,15 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Clos return nil } + cc.wmu.Lock() var trls []byte if hasTrailers { - cc.mu.Lock() trls, err = cc.encodeTrailers(req) - cc.mu.Unlock() if err != nil { - cc.writeStreamReset(cs.ID, http2ErrCodeInternal, err) - cc.forgetStreamID(cs.ID) + cc.wmu.Unlock() return err } } - - cc.wmu.Lock() defer cc.wmu.Unlock() // Two ways to send END_STREAM: either with trailers, or @@ -8038,6 +8247,8 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Clos // if the stream is dead. func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) { cc := cs.cc + req := cs.req + ctx := req.Context() cc.mu.Lock() defer cc.mu.Unlock() for { @@ -8047,8 +8258,14 @@ func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err er if cs.stopReqBody != nil { return 0, cs.stopReqBody } - if err := cs.checkResetOrDone(); err != nil { - return 0, err + select { + case <-cs.abort: + return 0, cs.abortErr + case <-ctx.Done(): + return 0, ctx.Err() + case <-req.Cancel: + return 0, http2errRequestCanceled + default: } if a := cs.flow.available(); a > 0 { take := a @@ -8066,9 +8283,14 @@ func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err er } } -// requires cc.mu be held. +var http2errNilRequestURL = errors.New("http2: Request.URI is nil") + +// requires cc.wmu be held. func (cc *http2ClientConn) encodeHeaders(req *Request, addGzipHeader bool, trailers string, contentLength int64) ([]byte, error) { cc.hbuf.Reset() + if req.URL == nil { + return nil, http2errNilRequestURL + } host := req.Host if host == "" { @@ -8254,7 +8476,7 @@ func http2shouldSendReqContentLength(method string, contentLength int64) bool { } } -// requires cc.mu be held. +// requires cc.wmu be held. func (cc *http2ClientConn) encodeTrailers(req *Request) ([]byte, error) { cc.hbuf.Reset() @@ -8299,51 +8521,51 @@ type http2resAndError struct { } // requires cc.mu be held. -func (cc *http2ClientConn) newStream() *http2clientStream { - cs := &http2clientStream{ - cc: cc, - ID: cc.nextStreamID, - resc: make(chan http2resAndError, 1), - peerReset: make(chan struct{}), - done: make(chan struct{}), - } +func (cc *http2ClientConn) addStreamLocked(cs *http2clientStream) { cs.flow.add(int32(cc.initialWindowSize)) cs.flow.setConnFlow(&cc.flow) cs.inflow.add(http2transportDefaultStreamFlow) cs.inflow.setConnFlow(&cc.inflow) + cs.ID = cc.nextStreamID cc.nextStreamID += 2 cc.streams[cs.ID] = cs - return cs + if cs.ID == 0 { + panic("assigned stream ID 0") + } } func (cc *http2ClientConn) forgetStreamID(id uint32) { - cc.streamByID(id, true) -} - -func (cc *http2ClientConn) streamByID(id uint32, andRemove bool) *http2clientStream { cc.mu.Lock() - defer cc.mu.Unlock() - cs := cc.streams[id] - if andRemove && cs != nil && !cc.closed { - cc.lastActive = time.Now() - delete(cc.streams, id) - if len(cc.streams) == 0 && cc.idleTimer != nil { - cc.idleTimer.Reset(cc.idleTimeout) - cc.lastIdle = time.Now() - } - close(cs.done) - // Wake up checkResetOrDone via clientStream.awaitFlowControl and - // wake up RoundTrip if there is a pending request. - cc.cond.Broadcast() + slen := len(cc.streams) + delete(cc.streams, id) + if len(cc.streams) != slen-1 { + panic("forgetting unknown stream id") + } + cc.lastActive = time.Now() + if len(cc.streams) == 0 && cc.idleTimer != nil { + cc.idleTimer.Reset(cc.idleTimeout) + cc.lastIdle = time.Now() + } + // Wake up writeRequestBody via clientStream.awaitFlowControl and + // wake up RoundTrip if there is a pending request. + cc.cond.Broadcast() + + closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() + if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 { + if http2VerboseLogs { + cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, cc.nextStreamID-2) + } + cc.closed = true + defer cc.tconn.Close() } - return cs + + cc.mu.Unlock() } // clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop. type http2clientConnReadLoop struct { - _ http2incomparable - cc *http2ClientConn - closeWhenIdle bool + _ http2incomparable + cc *http2ClientConn } // readLoop runs in its own goroutine and reads and dispatches frames. @@ -8403,23 +8625,43 @@ func (rl *http2clientConnReadLoop) cleanup() { } else if err == io.EOF { err = io.ErrUnexpectedEOF } + cc.closed = true for _, cs := range cc.streams { - cs.bufPipe.CloseWithError(err) // no-op if already closed - select { - case cs.resc <- http2resAndError{err: err}: - default: - } - close(cs.done) + cs.abortStreamLocked(err) } - cc.closed = true cc.cond.Broadcast() cc.mu.Unlock() } +// countReadFrameError calls Transport.CountError with a string +// representing err. +func (cc *http2ClientConn) countReadFrameError(err error) { + f := cc.t.CountError + if f == nil || err == nil { + return + } + if ce, ok := err.(http2ConnectionError); ok { + errCode := http2ErrCode(ce) + f(fmt.Sprintf("read_frame_conn_error_%s", errCode.stringToken())) + return + } + if errors.Is(err, io.EOF) { + f("read_frame_eof") + return + } + if errors.Is(err, io.ErrUnexpectedEOF) { + f("read_frame_unexpected_eof") + return + } + if errors.Is(err, http2ErrFrameTooLarge) { + f("read_frame_too_large") + return + } + f("read_frame_other") +} + func (rl *http2clientConnReadLoop) run() error { cc := rl.cc - rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse - gotReply := false // ever saw a HEADERS reply gotSettings := false readIdleTimeout := cc.t.ReadIdleTimeout var t *time.Timer @@ -8436,9 +8678,7 @@ func (rl *http2clientConnReadLoop) run() error { cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err) } if se, ok := err.(http2StreamError); ok { - if cs := cc.streamByID(se.StreamID, false); cs != nil { - cs.cc.writeStreamReset(cs.ID, se.Code, err) - cs.cc.forgetStreamID(cs.ID) + if cs := rl.streamByID(se.StreamID); cs != nil { if se.Cause == nil { se.Cause = cc.fr.errDetail } @@ -8446,6 +8686,7 @@ func (rl *http2clientConnReadLoop) run() error { } continue } else if err != nil { + cc.countReadFrameError(err) return err } if http2VerboseLogs { @@ -8458,22 +8699,16 @@ func (rl *http2clientConnReadLoop) run() error { } gotSettings = true } - maybeIdle := false // whether frame might transition us to idle switch f := f.(type) { case *http2MetaHeadersFrame: err = rl.processHeaders(f) - maybeIdle = true - gotReply = true case *http2DataFrame: err = rl.processData(f) - maybeIdle = true case *http2GoAwayFrame: err = rl.processGoAway(f) - maybeIdle = true case *http2RSTStreamFrame: err = rl.processResetStream(f) - maybeIdle = true case *http2SettingsFrame: err = rl.processSettings(f) case *http2PushPromiseFrame: @@ -8491,38 +8726,24 @@ func (rl *http2clientConnReadLoop) run() error { } return err } - if rl.closeWhenIdle && gotReply && maybeIdle { - cc.closeIfIdle() - } } } func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) error { - cc := rl.cc - cs := cc.streamByID(f.StreamID, false) + cs := rl.streamByID(f.StreamID) if cs == nil { // We'd get here if we canceled a request while the // server had its response still in flight. So if this // was just something we canceled, ignore it. return nil } - if f.StreamEnded() { - // Issue 20521: If the stream has ended, streamByID() causes - // clientStream.done to be closed, which causes the request's bodyWriter - // to be closed with an errStreamClosed, which may be received by - // clientConn.RoundTrip before the result of processing these headers. - // Deferring stream closure allows the header processing to occur first. - // clientConn.RoundTrip may still receive the bodyWriter error first, but - // the fix for issue 16102 prioritises any response. - // - // Issue 22413: If there is no request body, we should close the - // stream before writing to cs.resc so that the stream is closed - // immediately once RoundTrip returns. - if cs.req.Body != nil { - defer cc.forgetStreamID(f.StreamID) - } else { - cc.forgetStreamID(f.StreamID) - } + if cs.readClosed { + rl.endStreamError(cs, http2StreamError{ + StreamID: f.StreamID, + Code: http2ErrCodeProtocol, + Cause: errors.New("protocol error: headers after END_STREAM"), + }) + return nil } if !cs.firstByte { if cs.trace != nil { @@ -8546,9 +8767,11 @@ func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) erro return err } // Any other error type is a stream error. - cs.cc.writeStreamReset(f.StreamID, http2ErrCodeProtocol, err) - cc.forgetStreamID(cs.ID) - cs.resc <- http2resAndError{err: err} + rl.endStreamError(cs, http2StreamError{ + StreamID: f.StreamID, + Code: http2ErrCodeProtocol, + Cause: err, + }) return nil // return nil from process* funcs to keep conn alive } if res == nil { @@ -8556,7 +8779,11 @@ func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) erro return nil } cs.resTrailer = &res.Trailer - cs.resc <- http2resAndError{res: res} + cs.res = res + close(cs.respHeaderRecv) + if f.StreamEnded() { + rl.endStream(cs) + } return nil } @@ -8618,6 +8845,9 @@ func (rl *http2clientConnReadLoop) handleResponse(cs *http2clientStream, f *http } if statusCode >= 100 && statusCode <= 199 { + if f.StreamEnded() { + return nil, errors.New("1xx informational response with END_STREAM flag") + } cs.num1xx++ const max1xxResponses = 5 // arbitrary bound on number of informational responses, same as net/http if cs.num1xx > max1xxResponses { @@ -8630,8 +8860,9 @@ func (rl *http2clientConnReadLoop) handleResponse(cs *http2clientStream, f *http } if statusCode == 100 { http2traceGot100Continue(cs.trace) - if cs.on100 != nil { - cs.on100() // forces any write delay timer to fire + select { + case cs.on100 <- struct{}{}: + default: } } cs.pastHeaders = false // do it all again @@ -8660,10 +8891,9 @@ func (rl *http2clientConnReadLoop) handleResponse(cs *http2clientStream, f *http return res, nil } - cs.bufPipe = http2pipe{b: &http2dataBuffer{expected: res.ContentLength}} + cs.bufPipe.setBuffer(&http2dataBuffer{expected: res.ContentLength}) cs.bytesRemain = res.ContentLength res.Body = http2transportResponseBody{cs} - go cs.awaitRequestCancel(cs.req) if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" { res.Header.Del("Content-Encoding") @@ -8723,7 +8953,7 @@ func (b http2transportResponseBody) Read(p []byte) (n int, err error) { n = int(cs.bytesRemain) if err == nil { err = errors.New("net/http: server replied with more than declared Content-Length; truncated") - cc.writeStreamReset(cs.ID, http2ErrCodeProtocol, err) + cs.abortStream(err) } cs.readErr = err return int(cs.bytesRemain), err @@ -8741,8 +8971,6 @@ func (b http2transportResponseBody) Read(p []byte) (n int, err error) { } cc.mu.Lock() - defer cc.mu.Unlock() - var connAdd, streamAdd int32 // Check the conn-level first, before the stream-level. if v := cc.inflow.available(); v < http2transportDefaultConnFlow/2 { @@ -8759,6 +8987,8 @@ func (b http2transportResponseBody) Read(p []byte) (n int, err error) { cs.inflow.add(streamAdd) } } + cc.mu.Unlock() + if connAdd != 0 || streamAdd != 0 { cc.wmu.Lock() defer cc.wmu.Unlock() @@ -8779,34 +9009,40 @@ func (b http2transportResponseBody) Close() error { cs := b.cs cc := cs.cc - serverSentStreamEnd := cs.bufPipe.Err() == io.EOF unread := cs.bufPipe.Len() - - if unread > 0 || !serverSentStreamEnd { + if unread > 0 { cc.mu.Lock() - cc.wmu.Lock() - if !serverSentStreamEnd { - cc.fr.WriteRSTStream(cs.ID, http2ErrCodeCancel) - cs.didReset = true - } // Return connection-level flow control. if unread > 0 { cc.inflow.add(int32(unread)) + } + cc.mu.Unlock() + + cc.wmu.Lock() + // Return connection-level flow control. + if unread > 0 { cc.fr.WriteWindowUpdate(0, uint32(unread)) } cc.bw.Flush() cc.wmu.Unlock() - cc.mu.Unlock() } cs.bufPipe.BreakWithError(http2errClosedResponseBody) - cc.forgetStreamID(cs.ID) + cs.abortStream(http2errClosedResponseBody) + + select { + case <-cs.donec: + case <-cs.req.Context().Done(): + return cs.req.Context().Err() + case <-cs.req.Cancel: + return http2errRequestCanceled + } return nil } func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error { cc := rl.cc - cs := cc.streamByID(f.StreamID, f.StreamEnded()) + cs := rl.streamByID(f.StreamID) data := f.Data() if cs == nil { cc.mu.Lock() @@ -8835,6 +9071,14 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error { } return nil } + if cs.readClosed { + cc.logf("protocol error: received DATA after END_STREAM") + rl.endStreamError(cs, http2StreamError{ + StreamID: f.StreamID, + Code: http2ErrCodeProtocol, + }) + return nil + } if !cs.firstByte { cc.logf("protocol error: received DATA before a HEADERS frame") rl.endStreamError(cs, http2StreamError{ @@ -8866,30 +9110,39 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error { if pad := int(f.Length) - len(data); pad > 0 { refund += pad } - // Return len(data) now if the stream is already closed, - // since data will never be read. - didReset := cs.didReset - if didReset { - refund += len(data) + + didReset := false + var err error + if len(data) > 0 { + if _, err = cs.bufPipe.Write(data); err != nil { + // Return len(data) now if the stream is already closed, + // since data will never be read. + didReset = true + refund += len(data) + } } + if refund > 0 { cc.inflow.add(int32(refund)) + if !didReset { + cs.inflow.add(int32(refund)) + } + } + cc.mu.Unlock() + + if refund > 0 { cc.wmu.Lock() cc.fr.WriteWindowUpdate(0, uint32(refund)) if !didReset { - cs.inflow.add(int32(refund)) cc.fr.WriteWindowUpdate(cs.ID, uint32(refund)) } cc.bw.Flush() cc.wmu.Unlock() } - cc.mu.Unlock() - if len(data) > 0 && !didReset { - if _, err := cs.bufPipe.Write(data); err != nil { - rl.endStreamError(cs, err) - return err - } + if err != nil { + rl.endStreamError(cs, err) + return nil } } @@ -8902,24 +9155,26 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error { func (rl *http2clientConnReadLoop) endStream(cs *http2clientStream) { // TODO: check that any declared content-length matches, like // server.go's (*stream).endStream method. - rl.endStreamError(cs, nil) + if !cs.readClosed { + cs.readClosed = true + cs.bufPipe.closeWithErrorAndCode(io.EOF, cs.copyTrailers) + close(cs.peerClosed) + } } func (rl *http2clientConnReadLoop) endStreamError(cs *http2clientStream, err error) { - var code func() - if err == nil { - err = io.EOF - code = cs.copyTrailers - } - if http2isConnectionCloseRequest(cs.req) { - rl.closeWhenIdle = true - } - cs.bufPipe.closeWithErrorAndCode(err, code) + cs.readAborted = true + cs.abortStream(err) +} - select { - case cs.resc <- http2resAndError{err: err}: - default: +func (rl *http2clientConnReadLoop) streamByID(id uint32) *http2clientStream { + rl.cc.mu.Lock() + defer rl.cc.mu.Unlock() + cs := rl.cc.streams[id] + if cs != nil && !cs.readAborted { + return cs } + return nil } func (cs *http2clientStream) copyTrailers() { @@ -8938,12 +9193,33 @@ func (rl *http2clientConnReadLoop) processGoAway(f *http2GoAwayFrame) error { if f.ErrCode != 0 { // TODO: deal with GOAWAY more. particularly the error code cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode) + if fn := cc.t.CountError; fn != nil { + fn("recv_goaway_" + f.ErrCode.stringToken()) + } + } cc.setGoAway(f) return nil } func (rl *http2clientConnReadLoop) processSettings(f *http2SettingsFrame) error { + cc := rl.cc + // Locking both mu and wmu here allows frame encoding to read settings with only wmu held. + // Acquiring wmu when f.IsAck() is unnecessary, but convenient and mostly harmless. + cc.wmu.Lock() + defer cc.wmu.Unlock() + + if err := rl.processSettingsNoWrite(f); err != nil { + return err + } + if !f.IsAck() { + cc.fr.WriteSettingsAck() + cc.bw.Flush() + } + return nil +} + +func (rl *http2clientConnReadLoop) processSettingsNoWrite(f *http2SettingsFrame) error { cc := rl.cc cc.mu.Lock() defer cc.mu.Unlock() @@ -8956,12 +9232,14 @@ func (rl *http2clientConnReadLoop) processSettings(f *http2SettingsFrame) error return http2ConnectionError(http2ErrCodeProtocol) } + var seenMaxConcurrentStreams bool err := f.ForeachSetting(func(s http2Setting) error { switch s.ID { case http2SettingMaxFrameSize: cc.maxFrameSize = s.Val case http2SettingMaxConcurrentStreams: cc.maxConcurrentStreams = s.Val + seenMaxConcurrentStreams = true case http2SettingMaxHeaderListSize: cc.peerMaxHeaderListSize = uint64(s.Val) case http2SettingInitialWindowSize: @@ -8993,17 +9271,23 @@ func (rl *http2clientConnReadLoop) processSettings(f *http2SettingsFrame) error return err } - cc.wmu.Lock() - defer cc.wmu.Unlock() + if !cc.seenSettings { + if !seenMaxConcurrentStreams { + // This was the servers initial SETTINGS frame and it + // didn't contain a MAX_CONCURRENT_STREAMS field so + // increase the number of concurrent streams this + // connection can establish to our default. + cc.maxConcurrentStreams = http2defaultMaxConcurrentStreams + } + cc.seenSettings = true + } - cc.fr.WriteSettingsAck() - cc.bw.Flush() - return cc.werr + return nil } func (rl *http2clientConnReadLoop) processWindowUpdate(f *http2WindowUpdateFrame) error { cc := rl.cc - cs := cc.streamByID(f.StreamID, false) + cs := rl.streamByID(f.StreamID) if f.StreamID != 0 && cs == nil { return nil } @@ -9023,24 +9307,22 @@ func (rl *http2clientConnReadLoop) processWindowUpdate(f *http2WindowUpdateFrame } func (rl *http2clientConnReadLoop) processResetStream(f *http2RSTStreamFrame) error { - cs := rl.cc.streamByID(f.StreamID, true) + cs := rl.streamByID(f.StreamID) if cs == nil { - // TODO: return error if server tries to RST_STEAM an idle stream + // TODO: return error if server tries to RST_STREAM an idle stream return nil } - select { - case <-cs.peerReset: - // Already reset. - // This is the only goroutine - // which closes this, so there - // isn't a race. - default: - err := http2streamError(cs.ID, f.ErrCode) - cs.resetErr = err - close(cs.peerReset) - cs.bufPipe.CloseWithError(err) - cs.cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl + serr := http2streamError(cs.ID, f.ErrCode) + serr.Cause = http2errFromPeer + if f.ErrCode == http2ErrCodeProtocol { + rl.cc.SetDoNotReuse() } + if fn := cs.cc.t.CountError; fn != nil { + fn("recv_rststream_" + f.ErrCode.stringToken()) + } + cs.abortStream(serr) + + cs.bufPipe.CloseWithError(serr) return nil } @@ -9197,87 +9479,6 @@ type http2errorReader struct{ err error } func (r http2errorReader) Read(p []byte) (int, error) { return 0, r.err } -// bodyWriterState encapsulates various state around the Transport's writing -// of the request body, particularly regarding doing delayed writes of the body -// when the request contains "Expect: 100-continue". -type http2bodyWriterState struct { - cs *http2clientStream - timer *time.Timer // if non-nil, we're doing a delayed write - fnonce *sync.Once // to call fn with - fn func() // the code to run in the goroutine, writing the body - resc chan error // result of fn's execution - delay time.Duration // how long we should delay a delayed write for -} - -func (t *http2Transport) getBodyWriterState(cs *http2clientStream, body io.Reader) (s http2bodyWriterState) { - s.cs = cs - if body == nil { - return - } - resc := make(chan error, 1) - s.resc = resc - s.fn = func() { - cs.cc.mu.Lock() - cs.startedWrite = true - cs.cc.mu.Unlock() - resc <- cs.writeRequestBody(body, cs.req.Body) - } - s.delay = t.expectContinueTimeout() - if s.delay == 0 || - !httpguts.HeaderValuesContainsToken( - cs.req.Header["Expect"], - "100-continue") { - return - } - s.fnonce = new(sync.Once) - - // Arm the timer with a very large duration, which we'll - // intentionally lower later. It has to be large now because - // we need a handle to it before writing the headers, but the - // s.delay value is defined to not start until after the - // request headers were written. - const hugeDuration = 365 * 24 * time.Hour - s.timer = time.AfterFunc(hugeDuration, func() { - s.fnonce.Do(s.fn) - }) - return -} - -func (s http2bodyWriterState) cancel() { - if s.timer != nil { - if s.timer.Stop() { - s.resc <- nil - } - } -} - -func (s http2bodyWriterState) on100() { - if s.timer == nil { - // If we didn't do a delayed write, ignore the server's - // bogus 100 continue response. - return - } - s.timer.Stop() - go func() { s.fnonce.Do(s.fn) }() -} - -// scheduleBodyWrite starts writing the body, either immediately (in -// the common case) or after the delay timeout. It should not be -// called until after the headers have been written. -func (s http2bodyWriterState) scheduleBodyWrite() { - if s.timer == nil { - // We're not doing a delayed write (see - // getBodyWriterState), so just start the writing - // goroutine immediately. - go s.fn() - return - } - http2traceWait100Continue(s.cs.trace) - if s.timer.Stop() { - s.timer.Reset(s.delay) - } -} - // isConnectionCloseRequest reports whether req should use its own // connection for a single request and then close the connection. func http2isConnectionCloseRequest(req *Request) bool { diff --git a/src/vendor/modules.txt b/src/vendor/modules.txt index 3dc867957e486..250c87a6637a1 100644 --- a/src/vendor/modules.txt +++ b/src/vendor/modules.txt @@ -9,7 +9,7 @@ golang.org/x/crypto/curve25519/internal/field golang.org/x/crypto/hkdf golang.org/x/crypto/internal/subtle golang.org/x/crypto/poly1305 -# golang.org/x/net v0.0.0-20210825183410-e898025ed96a +# golang.org/x/net v0.0.0-20211004220534-69340ce214a7 ## explicit; go 1.17 golang.org/x/net/dns/dnsmessage golang.org/x/net/http/httpguts