diff options
Diffstat (limited to 'src/net/http/h2_bundle.go')
-rw-r--r-- | src/net/http/h2_bundle.go | 1674 |
1 files changed, 948 insertions, 726 deletions
diff --git a/src/net/http/h2_bundle.go b/src/net/http/h2_bundle.go index 22f1e78498..0fd8da1d15 100644 --- a/src/net/http/h2_bundle.go +++ b/src/net/http/h2_bundle.go @@ -52,6 +52,48 @@ import ( "golang.org/x/net/idna" ) +// asciiEqualFold is strings.EqualFold, ASCII only. It reports whether s and t +// are equal, ASCII-case-insensitively. +func http2asciiEqualFold(s, t string) bool { + if len(s) != len(t) { + return false + } + for i := 0; i < len(s); i++ { + if http2lower(s[i]) != http2lower(t[i]) { + return false + } + } + return true +} + +// lower returns the ASCII lowercase version of b. +func http2lower(b byte) byte { + if 'A' <= b && b <= 'Z' { + return b + ('a' - 'A') + } + return b +} + +// isASCIIPrint returns whether s is ASCII and printable according to +// https://tools.ietf.org/html/rfc20#section-4.2. +func http2isASCIIPrint(s string) bool { + for i := 0; i < len(s); i++ { + if s[i] < ' ' || s[i] > '~' { + return false + } + } + return true +} + +// asciiToLower returns the lowercase version of s if s is ASCII and printable, +// and whether or not it was. +func http2asciiToLower(s string) (lower string, ok bool) { + if !http2isASCIIPrint(s) { + return "", false + } + return strings.ToLower(s), true +} + // A list of the possible cipher suite ids. Taken from // https://www.iana.org/assignments/tls-parameters/tls-parameters.txt @@ -690,6 +732,12 @@ func http2isBadCipher(cipher uint16) bool { // ClientConnPool manages a pool of HTTP/2 client connections. type http2ClientConnPool interface { + // GetClientConn returns a specific HTTP/2 connection (usually + // a TLS-TCP connection) to an HTTP/2 server. On success, the + // returned ClientConn accounts for the upcoming RoundTrip + // call, so the caller should not omit it. If the caller needs + // to, ClientConn.RoundTrip can be called with a bogus + // new(http.Request) to release the stream reservation. GetClientConn(req *Request, addr string) (*http2ClientConn, error) MarkDead(*http2ClientConn) } @@ -716,7 +764,7 @@ type http2clientConnPool struct { conns map[string][]*http2ClientConn // key is host:port dialing map[string]*http2dialCall // currently in-flight dials keys map[*http2ClientConn][]string - addConnCalls map[string]*http2addConnCall // in-flight addConnIfNeede calls + addConnCalls map[string]*http2addConnCall // in-flight addConnIfNeeded calls } func (p *http2clientConnPool) GetClientConn(req *Request, addr string) (*http2ClientConn, error) { @@ -728,87 +776,85 @@ const ( http2noDialOnMiss = false ) -// shouldTraceGetConn reports whether getClientConn should call any -// ClientTrace.GetConn hook associated with the http.Request. -// -// This complexity is needed to avoid double calls of the GetConn hook -// during the back-and-forth between net/http and x/net/http2 (when the -// net/http.Transport is upgraded to also speak http2), as well as support -// the case where x/net/http2 is being used directly. -func (p *http2clientConnPool) shouldTraceGetConn(st http2clientConnIdleState) bool { - // If our Transport wasn't made via ConfigureTransport, always - // trace the GetConn hook if provided, because that means the - // http2 package is being used directly and it's the one - // dialing, as opposed to net/http. - if _, ok := p.t.ConnPool.(http2noDialClientConnPool); !ok { - return true - } - // Otherwise, only use the GetConn hook if this connection has - // been used previously for other requests. For fresh - // connections, the net/http package does the dialing. - return !st.freshConn -} - func (p *http2clientConnPool) getClientConn(req *Request, addr string, dialOnMiss bool) (*http2ClientConn, error) { + // TODO(dneil): Dial a new connection when t.DisableKeepAlives is set? if http2isConnectionCloseRequest(req) && dialOnMiss { // It gets its own connection. http2traceGetConn(req, addr) const singleUse = true - cc, err := p.t.dialClientConn(addr, singleUse) + cc, err := p.t.dialClientConn(req.Context(), addr, singleUse) if err != nil { return nil, err } return cc, nil } - p.mu.Lock() - for _, cc := range p.conns[addr] { - if st := cc.idleState(); st.canTakeNewRequest { - if p.shouldTraceGetConn(st) { - http2traceGetConn(req, addr) + for { + p.mu.Lock() + for _, cc := range p.conns[addr] { + if cc.ReserveNewRequest() { + // When a connection is presented to us by the net/http package, + // the GetConn hook has already been called. + // Don't call it a second time here. + if !cc.getConnCalled { + http2traceGetConn(req, addr) + } + cc.getConnCalled = false + p.mu.Unlock() + return cc, nil } + } + if !dialOnMiss { p.mu.Unlock() - return cc, nil + return nil, http2ErrNoCachedConn } - } - if !dialOnMiss { + http2traceGetConn(req, addr) + call := p.getStartDialLocked(req.Context(), addr) p.mu.Unlock() - return nil, http2ErrNoCachedConn + <-call.done + if http2shouldRetryDial(call, req) { + continue + } + cc, err := call.res, call.err + if err != nil { + return nil, err + } + if cc.ReserveNewRequest() { + return cc, nil + } } - http2traceGetConn(req, addr) - call := p.getStartDialLocked(addr) - p.mu.Unlock() - <-call.done - return call.res, call.err } // dialCall is an in-flight Transport dial call to a host. type http2dialCall struct { - _ http2incomparable - p *http2clientConnPool + _ http2incomparable + p *http2clientConnPool + // the context associated with the request + // that created this dialCall + ctx context.Context done chan struct{} // closed when done res *http2ClientConn // valid after done is closed err error // valid after done is closed } // requires p.mu is held. -func (p *http2clientConnPool) getStartDialLocked(addr string) *http2dialCall { +func (p *http2clientConnPool) getStartDialLocked(ctx context.Context, addr string) *http2dialCall { if call, ok := p.dialing[addr]; ok { // A dial is already in-flight. Don't start another. return call } - call := &http2dialCall{p: p, done: make(chan struct{})} + call := &http2dialCall{p: p, done: make(chan struct{}), ctx: ctx} if p.dialing == nil { p.dialing = make(map[string]*http2dialCall) } p.dialing[addr] = call - go call.dial(addr) + go call.dial(call.ctx, addr) return call } // run in its own goroutine. -func (c *http2dialCall) dial(addr string) { +func (c *http2dialCall) dial(ctx context.Context, addr string) { const singleUse = false // shared conn - c.res, c.err = c.p.t.dialClientConn(addr, singleUse) + c.res, c.err = c.p.t.dialClientConn(ctx, addr, singleUse) close(c.done) c.p.mu.Lock() @@ -871,6 +917,7 @@ func (c *http2addConnCall) run(t *http2Transport, key string, tc *tls.Conn) { if err != nil { c.err = err } else { + cc.getConnCalled = true // already called by the net/http package p.addConnLocked(key, cc) } delete(p.addConnCalls, key) @@ -953,6 +1000,31 @@ func (p http2noDialClientConnPool) GetClientConn(req *Request, addr string) (*ht return p.getClientConn(req, addr, http2noDialOnMiss) } +// shouldRetryDial reports whether the current request should +// retry dialing after the call finished unsuccessfully, for example +// if the dial was canceled because of a context cancellation or +// deadline expiry. +func http2shouldRetryDial(call *http2dialCall, req *Request) bool { + if call.err == nil { + // No error, no need to retry + return false + } + if call.ctx == req.Context() { + // If the call has the same context as the request, the dial + // should not be retried, since any cancellation will have come + // from this request. + return false + } + if !errors.Is(call.err, context.Canceled) && !errors.Is(call.err, context.DeadlineExceeded) { + // If the call error is not because of a context cancellation or a deadline expiry, + // the dial should not be retried. + return false + } + // Only retry if the error is a context cancellation error or deadline expiry + // and the context associated with the call was canceled or expired. + return call.ctx.Err() != nil +} + // Buffer chunks are allocated from a pool to reduce pressure on GC. // The maximum wasted space per dataBuffer is 2x the largest size class, // which happens when the dataBuffer has multiple chunks and there is @@ -1148,6 +1220,11 @@ type http2StreamError struct { Cause error // optional additional detail } +// errFromPeer is a sentinel error value for StreamError.Cause to +// indicate that the StreamError was sent from the peer over the wire +// and wasn't locally generated in the Transport. +var http2errFromPeer = errors.New("received from peer") + func http2streamError(id uint32, code http2ErrCode) http2StreamError { return http2StreamError{StreamID: id, Code: code} } @@ -2261,7 +2338,7 @@ func http2parseHeadersFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) ( return nil, err } } - if len(p)-int(padLength) <= 0 { + if len(p)-int(padLength) < 0 { return nil, http2streamError(fh.StreamID, http2ErrCodeProtocol) } hf.headerFragBuf = p[:len(p)-int(padLength)] @@ -3094,12 +3171,12 @@ func http2buildCommonHeaderMaps() { } } -func http2lowerHeader(v string) string { +func http2lowerHeader(v string) (lower string, ascii bool) { http2buildCommonHeaderMapsOnce() if s, ok := http2commonLowerHeader[v]; ok { - return s + return s, true } - return strings.ToLower(v) + return http2asciiToLower(v) } var ( @@ -3480,6 +3557,17 @@ type http2pipeBuffer interface { io.Reader } +// setBuffer initializes the pipe buffer. +// It has no effect if the pipe is already closed. +func (p *http2pipe) setBuffer(b http2pipeBuffer) { + p.mu.Lock() + defer p.mu.Unlock() + if p.err != nil || p.breakErr != nil { + return + } + p.b = b +} + func (p *http2pipe) Len() int { p.mu.Lock() defer p.mu.Unlock() @@ -3831,16 +3919,12 @@ func http2ConfigureServer(s *Server, conf *http2Server) error { s.TLSConfig.PreferServerCipherSuites = true - haveNPN := false - for _, p := range s.TLSConfig.NextProtos { - if p == http2NextProtoTLS { - haveNPN = true - break - } - } - if !haveNPN { + if !http2strSliceContains(s.TLSConfig.NextProtos, http2NextProtoTLS) { s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, http2NextProtoTLS) } + if !http2strSliceContains(s.TLSConfig.NextProtos, "http/1.1") { + s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, "http/1.1") + } if s.TLSNextProto == nil { s.TLSNextProto = map[string]func(*Server, *tls.Conn, Handler){} @@ -4873,7 +4957,9 @@ func (sc *http2serverConn) startGracefulShutdown() { sc.shutdownOnce.Do(func() { sc.sendServeMsg(http2gracefulShutdownMsg) }) } -// After sending GOAWAY, the connection will close after goAwayTimeout. +// After sending GOAWAY with an error code (non-graceful shutdown), the +// connection will close after goAwayTimeout. +// // If we close the connection immediately after sending GOAWAY, there may // be unsent data in our kernel receive buffer, which will cause the kernel // to send a TCP RST on close() instead of a FIN. This RST will abort the @@ -5209,23 +5295,37 @@ func (sc *http2serverConn) processSettingInitialWindowSize(val uint32) error { func (sc *http2serverConn) processData(f *http2DataFrame) error { sc.serveG.check() - if sc.inGoAway && sc.goAwayCode != http2ErrCodeNo { + id := f.Header().StreamID + if sc.inGoAway && (sc.goAwayCode != http2ErrCodeNo || id > sc.maxClientStreamID) { + // Discard all DATA frames if the GOAWAY is due to an + // error, or: + // + // Section 6.8: After sending a GOAWAY frame, the sender + // can discard frames for streams initiated by the + // receiver with identifiers higher than the identified + // last stream. return nil } - data := f.Data() - // "If a DATA frame is received whose stream is not in "open" - // or "half closed (local)" state, the recipient MUST respond - // with a stream error (Section 5.4.2) of type STREAM_CLOSED." - id := f.Header().StreamID + data := f.Data() state, st := sc.state(id) if id == 0 || state == http2stateIdle { + // Section 6.1: "DATA frames MUST be associated with a + // stream. If a DATA frame is received whose stream + // identifier field is 0x0, the recipient MUST respond + // with a connection error (Section 5.4.1) of type + // PROTOCOL_ERROR." + // // Section 5.1: "Receiving any frame other than HEADERS // or PRIORITY on a stream in this state MUST be // treated as a connection error (Section 5.4.1) of // type PROTOCOL_ERROR." return http2ConnectionError(http2ErrCodeProtocol) } + + // "If a DATA frame is received whose stream is not in "open" + // or "half closed (local)" state, the recipient MUST respond + // with a stream error (Section 5.4.2) of type STREAM_CLOSED." if st == nil || state != http2stateOpen || st.gotTrailerHeader || st.resetQueued { // This includes sending a RST_STREAM if the stream is // in stateHalfClosedLocal (which currently means that @@ -6353,8 +6453,12 @@ func (w *http2responseWriter) Push(target string, opts *PushOptions) error { // but PUSH_PROMISE requests cannot have a body. // http://tools.ietf.org/html/rfc7540#section-8.2 // Also disallow Host, since the promised URL must be absolute. - switch strings.ToLower(k) { - case "content-length", "content-encoding", "trailer", "te", "expect", "host": + if http2asciiEqualFold(k, "content-length") || + http2asciiEqualFold(k, "content-encoding") || + http2asciiEqualFold(k, "trailer") || + http2asciiEqualFold(k, "te") || + http2asciiEqualFold(k, "expect") || + http2asciiEqualFold(k, "host") { return fmt.Errorf("promised request headers cannot include %q", k) } } @@ -6562,6 +6666,15 @@ const ( http2transportDefaultStreamMinRefresh = 4 << 10 http2defaultUserAgent = "Go-http-client/2.0" + + // initialMaxConcurrentStreams is a connections maxConcurrentStreams until + // it's received servers initial SETTINGS frame, which corresponds with the + // spec's minimum recommended value. + http2initialMaxConcurrentStreams = 100 + + // defaultMaxConcurrentStreams is a connections default maxConcurrentStreams + // if the server doesn't include one in its initial SETTINGS frame. + http2defaultMaxConcurrentStreams = 1000 ) // Transport is an HTTP/2 Transport. @@ -6738,11 +6851,12 @@ func (t *http2Transport) initConnPool() { // ClientConn is the state of a single HTTP/2 client connection to an // HTTP/2 server. type http2ClientConn struct { - t *http2Transport - tconn net.Conn // usually *tls.Conn, except specialized impls - tlsState *tls.ConnectionState // nil only for specialized impls - reused uint32 // whether conn is being reused; atomic - singleUse bool // whether being used for a single http.Request + t *http2Transport + tconn net.Conn // usually *tls.Conn, except specialized impls + tlsState *tls.ConnectionState // nil only for specialized impls + reused uint32 // whether conn is being reused; atomic + singleUse bool // whether being used for a single http.Request + getConnCalled bool // used by clientConnPool // readLoop goroutine fields: readerDone chan struct{} // closed on error @@ -6755,87 +6869,94 @@ type http2ClientConn struct { cond *sync.Cond // hold mu; broadcast on flow/closed changes flow http2flow // our conn-level flow control quota (cs.flow is per stream) inflow http2flow // peer's conn-level flow control + doNotReuse bool // whether conn is marked to not be reused for any future requests closing bool closed bool + seenSettings bool // true if we've seen a settings frame, false otherwise wantSettingsAck bool // we sent a SETTINGS frame and haven't heard back goAway *http2GoAwayFrame // if non-nil, the GoAwayFrame we received goAwayDebug string // goAway frame's debug data, retained as a string streams map[uint32]*http2clientStream // client-initiated + streamsReserved int // incr by ReserveNewRequest; decr on RoundTrip nextStreamID uint32 pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams pings map[[8]byte]chan struct{} // in flight ping data to notification channel - bw *bufio.Writer br *bufio.Reader - fr *http2Framer lastActive time.Time lastIdle time.Time // time last idle - // Settings from peer: (also guarded by mu) + // Settings from peer: (also guarded by wmu) maxFrameSize uint32 maxConcurrentStreams uint32 peerMaxHeaderListSize uint64 initialWindowSize uint32 - hbuf bytes.Buffer // HPACK encoder writes into this - henc *hpack.Encoder - freeBuf [][]byte + // reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests. + // Write to reqHeaderMu to lock it, read from it to unlock. + // Lock reqmu BEFORE mu or wmu. + reqHeaderMu chan struct{} - wmu sync.Mutex // held while writing; acquire AFTER mu if holding both - werr error // first write error that has occurred + // wmu is held while writing. + // Acquire BEFORE mu when holding both, to avoid blocking mu on network writes. + // Only acquire both at the same time when changing peer settings. + wmu sync.Mutex + bw *bufio.Writer + fr *http2Framer + werr error // first write error that has occurred + hbuf bytes.Buffer // HPACK encoder writes into this + henc *hpack.Encoder } // clientStream is the state for a single HTTP/2 stream. One of these // is created for each Transport.RoundTrip call. type http2clientStream struct { - cc *http2ClientConn - req *Request + cc *http2ClientConn + + // Fields of Request that we may access even after the response body is closed. + ctx context.Context + reqCancel <-chan struct{} + trace *httptrace.ClientTrace // or nil ID uint32 - resc chan http2resAndError bufPipe http2pipe // buffered pipe with the flow-controlled response payload - startedWrite bool // started request body write; guarded by cc.mu requestedGzip bool - on100 func() // optional code to run if get a 100 continue response + isHead bool + + abortOnce sync.Once + abort chan struct{} // closed to signal stream should end immediately + abortErr error // set if abort is closed + + peerClosed chan struct{} // closed when the peer sends an END_STREAM flag + donec chan struct{} // closed after the stream is in the closed state + on100 chan struct{} // buffered; written to if a 100 is received + + respHeaderRecv chan struct{} // closed when headers are received + res *Response // set if respHeaderRecv is closed flow http2flow // guarded by cc.mu inflow http2flow // guarded by cc.mu bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read readErr error // sticky read error; owned by transportResponseBody.Read - stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu - didReset bool // whether we sent a RST_STREAM to the server; guarded by cc.mu - peerReset chan struct{} // closed on peer reset - resetErr error // populated before peerReset is closed + reqBody io.ReadCloser + reqBodyContentLength int64 // -1 means unknown + reqBodyClosed bool // body has been closed; guarded by cc.mu - done chan struct{} // closed when stream remove from cc.streams map; close calls guarded by cc.mu + // owned by writeRequest: + sentEndStream bool // sent an END_STREAM flag to the peer + sentHeaders bool // owned by clientConnReadLoop: firstByte bool // got the first response byte pastHeaders bool // got first MetaHeadersFrame (actual headers) pastTrailers bool // got optional second MetaHeadersFrame (trailers) num1xx uint8 // number of 1xx responses seen + readClosed bool // peer sent an END_STREAM flag + readAborted bool // read loop reset the stream trailer Header // accumulated trailers resTrailer *Header // client's Response.Trailer } -// awaitRequestCancel waits for the user to cancel a request or for the done -// channel to be signaled. A non-nil error is returned only if the request was -// canceled. -func http2awaitRequestCancel(req *Request, done <-chan struct{}) error { - ctx := req.Context() - if req.Cancel == nil && ctx.Done() == nil { - return nil - } - select { - case <-req.Cancel: - return http2errRequestCanceled - case <-ctx.Done(): - return ctx.Err() - case <-done: - return nil - } -} - var http2got1xxFuncForTests func(int, textproto.MIMEHeader) error // get1xxTraceFunc returns the value of request's httptrace.ClientTrace.Got1xxResponse func, @@ -6847,59 +6968,37 @@ func (cs *http2clientStream) get1xxTraceFunc() func(int, textproto.MIMEHeader) e return http2traceGot1xxResponseFunc(cs.trace) } -// awaitRequestCancel waits for the user to cancel a request, its context to -// expire, or for the request to be done (any way it might be removed from the -// cc.streams map: peer reset, successful completion, TCP connection breakage, -// etc). If the request is canceled, then cs will be canceled and closed. -func (cs *http2clientStream) awaitRequestCancel(req *Request) { - if err := http2awaitRequestCancel(req, cs.done); err != nil { - cs.cancelStream() - cs.bufPipe.CloseWithError(err) - } +func (cs *http2clientStream) abortStream(err error) { + cs.cc.mu.Lock() + defer cs.cc.mu.Unlock() + cs.abortStreamLocked(err) } -func (cs *http2clientStream) cancelStream() { - cc := cs.cc - cc.mu.Lock() - didReset := cs.didReset - cs.didReset = true - cc.mu.Unlock() - - if !didReset { - cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) - cc.forgetStreamID(cs.ID) +func (cs *http2clientStream) abortStreamLocked(err error) { + cs.abortOnce.Do(func() { + cs.abortErr = err + close(cs.abort) + }) + if cs.reqBody != nil && !cs.reqBodyClosed { + cs.reqBody.Close() + cs.reqBodyClosed = true } -} - -// checkResetOrDone reports any error sent in a RST_STREAM frame by the -// server, or errStreamClosed if the stream is complete. -func (cs *http2clientStream) checkResetOrDone() error { - select { - case <-cs.peerReset: - return cs.resetErr - case <-cs.done: - return http2errStreamClosed - default: - return nil + // TODO(dneil): Clean up tests where cs.cc.cond is nil. + if cs.cc.cond != nil { + // Wake up writeRequestBody if it is waiting on flow control. + cs.cc.cond.Broadcast() } } -func (cs *http2clientStream) getStartedWrite() bool { +func (cs *http2clientStream) abortRequestBodyWrite() { cc := cs.cc cc.mu.Lock() defer cc.mu.Unlock() - return cs.startedWrite -} - -func (cs *http2clientStream) abortRequestBodyWrite(err error) { - if err == nil { - panic("nil error") + if cs.reqBody != nil && !cs.reqBodyClosed { + cs.reqBody.Close() + cs.reqBodyClosed = true + cc.cond.Broadcast() } - cc := cs.cc - cc.mu.Lock() - cs.stopReqBody = err - cc.cond.Broadcast() - cc.mu.Unlock() } type http2stickyErrWriter struct { @@ -6987,9 +7086,9 @@ func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Res } reused := !atomic.CompareAndSwapUint32(&cc.reused, 0, 1) http2traceGotConn(req, cc, reused) - res, gotErrAfterReqBodyWrite, err := cc.roundTrip(req) + res, err := cc.RoundTrip(req) if err != nil && retry <= 6 { - if req, err = http2shouldRetryRequest(req, err, gotErrAfterReqBodyWrite); err == nil { + if req, err = http2shouldRetryRequest(req, err); err == nil { // After the first retry, do exponential backoff with 10% jitter. if retry == 0 { continue @@ -7000,7 +7099,7 @@ func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Res case <-time.After(time.Second * time.Duration(backoff)): continue case <-req.Context().Done(): - return nil, req.Context().Err() + err = req.Context().Err() } } } @@ -7031,7 +7130,7 @@ var ( // response headers. It is always called with a non-nil error. // It returns either a request to retry (either the same request, or a // modified clone), or an error if the request can't be replayed. -func http2shouldRetryRequest(req *Request, err error, afterBodyWrite bool) (*Request, error) { +func http2shouldRetryRequest(req *Request, err error) (*Request, error) { if !http2canRetryError(err) { return nil, err } @@ -7044,7 +7143,6 @@ func http2shouldRetryRequest(req *Request, err error, afterBodyWrite bool) (*Req // If the request body can be reset back to its original // state via the optional req.GetBody, do that. if req.GetBody != nil { - // TODO: consider a req.Body.Close here? or audit that all caller paths do? body, err := req.GetBody() if err != nil { return nil, err @@ -7056,10 +7154,8 @@ func http2shouldRetryRequest(req *Request, err error, afterBodyWrite bool) (*Req // The Request.Body can't reset back to the beginning, but we // don't seem to have started to read from it yet, so reuse - // the request directly. The "afterBodyWrite" means the - // bodyWrite process has started, which becomes true before - // the first Read. - if !afterBodyWrite { + // the request directly. + if err == http2errClientConnUnusable { return req, nil } @@ -7071,17 +7167,21 @@ func http2canRetryError(err error) bool { return true } if se, ok := err.(http2StreamError); ok { + if se.Code == http2ErrCodeProtocol && se.Cause == http2errFromPeer { + // See golang/go#47635, golang/go#42777 + return true + } return se.Code == http2ErrCodeRefusedStream } return false } -func (t *http2Transport) dialClientConn(addr string, singleUse bool) (*http2ClientConn, error) { +func (t *http2Transport) dialClientConn(ctx context.Context, addr string, singleUse bool) (*http2ClientConn, error) { host, _, err := net.SplitHostPort(addr) if err != nil { return nil, err } - tconn, err := t.dialTLS()("tcp", addr, t.newTLSConfig(host)) + tconn, err := t.dialTLS(ctx)("tcp", addr, t.newTLSConfig(host)) if err != nil { return nil, err } @@ -7102,34 +7202,28 @@ func (t *http2Transport) newTLSConfig(host string) *tls.Config { return cfg } -func (t *http2Transport) dialTLS() func(string, string, *tls.Config) (net.Conn, error) { +func (t *http2Transport) dialTLS(ctx context.Context) func(string, string, *tls.Config) (net.Conn, error) { if t.DialTLS != nil { return t.DialTLS } - return t.dialTLSDefault -} - -func (t *http2Transport) dialTLSDefault(network, addr string, cfg *tls.Config) (net.Conn, error) { - cn, err := tls.Dial(network, addr, cfg) - if err != nil { - return nil, err - } - if err := cn.Handshake(); err != nil { - return nil, err - } - if !cfg.InsecureSkipVerify { - if err := cn.VerifyHostname(cfg.ServerName); err != nil { + return func(network, addr string, cfg *tls.Config) (net.Conn, error) { + dialer := &tls.Dialer{ + Config: cfg, + } + cn, err := dialer.DialContext(ctx, network, addr) + if err != nil { return nil, err } + tlsCn := cn.(*tls.Conn) // DialContext comment promises this will always succeed + state := tlsCn.ConnectionState() + if p := state.NegotiatedProtocol; p != http2NextProtoTLS { + return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, http2NextProtoTLS) + } + if !state.NegotiatedProtocolIsMutual { + return nil, errors.New("http2: could not negotiate protocol mutually") + } + return cn, nil } - state := cn.ConnectionState() - if p := state.NegotiatedProtocol; p != http2NextProtoTLS { - return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, http2NextProtoTLS) - } - if !state.NegotiatedProtocolIsMutual { - return nil, errors.New("http2: could not negotiate protocol mutually") - } - return cn, nil } // disableKeepAlives reports whether connections should be closed as @@ -7155,14 +7249,15 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2Client tconn: c, readerDone: make(chan struct{}), nextStreamID: 1, - maxFrameSize: 16 << 10, // spec default - initialWindowSize: 65535, // spec default - maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough. - peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead. + maxFrameSize: 16 << 10, // spec default + initialWindowSize: 65535, // spec default + maxConcurrentStreams: http2initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings. + peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead. streams: make(map[uint32]*http2clientStream), singleUse: singleUse, wantSettingsAck: true, pings: make(map[[8]byte]chan struct{}), + reqHeaderMu: make(chan struct{}, 1), } if d := t.idleConnTimeout(); d != 0 { cc.idleTimeout = d @@ -7232,6 +7327,13 @@ func (cc *http2ClientConn) healthCheck() { } } +// SetDoNotReuse marks cc as not reusable for future HTTP requests. +func (cc *http2ClientConn) SetDoNotReuse() { + cc.mu.Lock() + defer cc.mu.Unlock() + cc.doNotReuse = true +} + func (cc *http2ClientConn) setGoAway(f *http2GoAwayFrame) { cc.mu.Lock() defer cc.mu.Unlock() @@ -7249,27 +7351,39 @@ func (cc *http2ClientConn) setGoAway(f *http2GoAwayFrame) { last := f.LastStreamID for streamID, cs := range cc.streams { if streamID > last { - select { - case cs.resc <- http2resAndError{err: http2errClientConnGotGoAway}: - default: - } + cs.abortStreamLocked(http2errClientConnGotGoAway) } } } // CanTakeNewRequest reports whether the connection can take a new request, // meaning it has not been closed or received or sent a GOAWAY. +// +// If the caller is going to immediately make a new request on this +// connection, use ReserveNewRequest instead. func (cc *http2ClientConn) CanTakeNewRequest() bool { cc.mu.Lock() defer cc.mu.Unlock() return cc.canTakeNewRequestLocked() } +// ReserveNewRequest is like CanTakeNewRequest but also reserves a +// concurrent stream in cc. The reservation is decremented on the +// next call to RoundTrip. +func (cc *http2ClientConn) ReserveNewRequest() bool { + cc.mu.Lock() + defer cc.mu.Unlock() + if st := cc.idleStateLocked(); !st.canTakeNewRequest { + return false + } + cc.streamsReserved++ + return true +} + // clientConnIdleState describes the suitability of a client // connection to initiate a new RoundTrip request. type http2clientConnIdleState struct { canTakeNewRequest bool - freshConn bool // whether it's unused by any previous request } func (cc *http2ClientConn) idleState() http2clientConnIdleState { @@ -7290,13 +7404,13 @@ func (cc *http2ClientConn) idleStateLocked() (st http2clientConnIdleState) { // writing it. maxConcurrentOkay = true } else { - maxConcurrentOkay = int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) + maxConcurrentOkay = int64(len(cc.streams)+cc.streamsReserved+1) <= int64(cc.maxConcurrentStreams) } st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay && + !cc.doNotReuse && int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 && !cc.tooIdleLocked() - st.freshConn = cc.nextStreamID == 1 && st.canTakeNewRequest return } @@ -7327,7 +7441,7 @@ func (cc *http2ClientConn) onIdleTimeout() { func (cc *http2ClientConn) closeIfIdle() { cc.mu.Lock() - if len(cc.streams) > 0 { + if len(cc.streams) > 0 || cc.streamsReserved > 0 { cc.mu.Unlock() return } @@ -7342,9 +7456,15 @@ func (cc *http2ClientConn) closeIfIdle() { cc.tconn.Close() } +func (cc *http2ClientConn) isDoNotReuseAndIdle() bool { + cc.mu.Lock() + defer cc.mu.Unlock() + return cc.doNotReuse && len(cc.streams) == 0 +} + var http2shutdownEnterWaitStateHook = func() {} -// Shutdown gracefully close the client connection, waiting for running streams to complete. +// Shutdown gracefully closes the client connection, waiting for running streams to complete. func (cc *http2ClientConn) Shutdown(ctx context.Context) error { if err := cc.sendGoAway(); err != nil { return err @@ -7383,15 +7503,18 @@ func (cc *http2ClientConn) Shutdown(ctx context.Context) error { func (cc *http2ClientConn) sendGoAway() error { cc.mu.Lock() - defer cc.mu.Unlock() - cc.wmu.Lock() - defer cc.wmu.Unlock() - if cc.closing { + closing := cc.closing + cc.closing = true + maxStreamID := cc.nextStreamID + cc.mu.Unlock() + if closing { // GOAWAY sent already return nil } + + cc.wmu.Lock() + defer cc.wmu.Unlock() // Send a graceful shutdown frame to server - maxStreamID := cc.nextStreamID if err := cc.fr.WriteGoAway(maxStreamID, http2ErrCodeNo, nil); err != nil { return err } @@ -7399,7 +7522,6 @@ func (cc *http2ClientConn) sendGoAway() error { return err } // Prevent new requests - cc.closing = true return nil } @@ -7407,17 +7529,12 @@ func (cc *http2ClientConn) sendGoAway() error { // err is sent to streams. func (cc *http2ClientConn) closeForError(err error) error { cc.mu.Lock() + cc.closed = true + for _, cs := range cc.streams { + cs.abortStreamLocked(err) + } defer cc.cond.Broadcast() defer cc.mu.Unlock() - for id, cs := range cc.streams { - select { - case cs.resc <- http2resAndError{err: err}: - default: - } - cs.bufPipe.CloseWithError(err) - delete(cc.streams, id) - } - cc.closed = true return cc.tconn.Close() } @@ -7435,46 +7552,6 @@ func (cc *http2ClientConn) closeForLostPing() error { return cc.closeForError(err) } -const http2maxAllocFrameSize = 512 << 10 - -// frameBuffer returns a scratch buffer suitable for writing DATA frames. -// They're capped at the min of the peer's max frame size or 512KB -// (kinda arbitrarily), but definitely capped so we don't allocate 4GB -// bufers. -func (cc *http2ClientConn) frameScratchBuffer() []byte { - cc.mu.Lock() - size := cc.maxFrameSize - if size > http2maxAllocFrameSize { - size = http2maxAllocFrameSize - } - for i, buf := range cc.freeBuf { - if len(buf) >= int(size) { - cc.freeBuf[i] = nil - cc.mu.Unlock() - return buf[:size] - } - } - cc.mu.Unlock() - return make([]byte, size) -} - -func (cc *http2ClientConn) putFrameScratchBuffer(buf []byte) { - cc.mu.Lock() - defer cc.mu.Unlock() - const maxBufs = 4 // arbitrary; 4 concurrent requests per conn? investigate. - if len(cc.freeBuf) < maxBufs { - cc.freeBuf = append(cc.freeBuf, buf) - return - } - for i, old := range cc.freeBuf { - if old == nil { - cc.freeBuf[i] = buf - return - } - } - // forget about it. -} - // errRequestCanceled is a copy of net/http's errRequestCanceled because it's not // exported. At least they'll be DeepEqual for h1-vs-h2 comparisons tests. var http2errRequestCanceled = errors.New("net/http: request canceled") @@ -7517,7 +7594,7 @@ func http2checkConnHeaders(req *Request) error { if vv := req.Header["Transfer-Encoding"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "chunked") { return fmt.Errorf("http2: invalid Transfer-Encoding request header: %q", vv) } - if vv := req.Header["Connection"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && !strings.EqualFold(vv[0], "close") && !strings.EqualFold(vv[0], "keep-alive")) { + if vv := req.Header["Connection"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && !http2asciiEqualFold(vv[0], "close") && !http2asciiEqualFold(vv[0], "keep-alive")) { return fmt.Errorf("http2: invalid Connection request header: %q", vv) } return nil @@ -7536,41 +7613,142 @@ func http2actualContentLength(req *Request) int64 { return -1 } +func (cc *http2ClientConn) decrStreamReservations() { + cc.mu.Lock() + defer cc.mu.Unlock() + cc.decrStreamReservationsLocked() +} + +func (cc *http2ClientConn) decrStreamReservationsLocked() { + if cc.streamsReserved > 0 { + cc.streamsReserved-- + } +} + func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { - resp, _, err := cc.roundTrip(req) - return resp, err + ctx := req.Context() + cs := &http2clientStream{ + cc: cc, + ctx: ctx, + reqCancel: req.Cancel, + isHead: req.Method == "HEAD", + reqBody: req.Body, + reqBodyContentLength: http2actualContentLength(req), + trace: httptrace.ContextClientTrace(ctx), + peerClosed: make(chan struct{}), + abort: make(chan struct{}), + respHeaderRecv: make(chan struct{}), + donec: make(chan struct{}), + } + go cs.doRequest(req) + + waitDone := func() error { + select { + case <-cs.donec: + return nil + case <-ctx.Done(): + return ctx.Err() + case <-cs.reqCancel: + return http2errRequestCanceled + } + } + + for { + select { + case <-cs.respHeaderRecv: + res := cs.res + if res.StatusCode > 299 { + // On error or status code 3xx, 4xx, 5xx, etc abort any + // ongoing write, assuming that the server doesn't care + // about our request body. If the server replied with 1xx or + // 2xx, however, then assume the server DOES potentially + // want our body (e.g. full-duplex streaming: + // golang.org/issue/13444). If it turns out the server + // doesn't, they'll RST_STREAM us soon enough. This is a + // heuristic to avoid adding knobs to Transport. Hopefully + // we can keep it. + cs.abortRequestBodyWrite() + } + res.Request = req + res.TLS = cc.tlsState + if res.Body == http2noBody && http2actualContentLength(req) == 0 { + // If there isn't a request or response body still being + // written, then wait for the stream to be closed before + // RoundTrip returns. + if err := waitDone(); err != nil { + return nil, err + } + } + return res, nil + case <-cs.abort: + waitDone() + return nil, cs.abortErr + case <-ctx.Done(): + err := ctx.Err() + cs.abortStream(err) + return nil, err + case <-cs.reqCancel: + cs.abortStream(http2errRequestCanceled) + return nil, http2errRequestCanceled + } + } } -func (cc *http2ClientConn) roundTrip(req *Request) (res *Response, gotErrAfterReqBodyWrite bool, err error) { +// writeRequest runs for the duration of the request lifetime. +// +// It sends the request and performs post-request cleanup (closing Request.Body, etc.). +func (cs *http2clientStream) doRequest(req *Request) { + err := cs.writeRequest(req) + cs.cleanupWriteRequest(err) +} + +// writeRequest sends a request. +// +// It returns nil after the request is written, the response read, +// and the request stream is half-closed by the peer. +// +// It returns non-nil if the request ends otherwise. +// If the returned error is StreamError, the error Code may be used in resetting the stream. +func (cs *http2clientStream) writeRequest(req *Request) (err error) { + cc := cs.cc + ctx := cs.ctx + if err := http2checkConnHeaders(req); err != nil { - return nil, false, err - } - if cc.idleTimer != nil { - cc.idleTimer.Stop() + return err } - trailers, err := http2commaSeparatedTrailers(req) - if err != nil { - return nil, false, err + // Acquire the new-request lock by writing to reqHeaderMu. + // This lock guards the critical section covering allocating a new stream ID + // (requires mu) and creating the stream (requires wmu). + if cc.reqHeaderMu == nil { + panic("RoundTrip on uninitialized ClientConn") // for tests + } + select { + case cc.reqHeaderMu <- struct{}{}: + case <-cs.reqCancel: + return http2errRequestCanceled + case <-ctx.Done(): + return ctx.Err() } - hasTrailers := trailers != "" cc.mu.Lock() - if err := cc.awaitOpenSlotForRequest(req); err != nil { + if cc.idleTimer != nil { + cc.idleTimer.Stop() + } + cc.decrStreamReservationsLocked() + if err := cc.awaitOpenSlotForStreamLocked(cs); err != nil { cc.mu.Unlock() - return nil, false, err + <-cc.reqHeaderMu + return err } - - body := req.Body - contentLen := http2actualContentLength(req) - hasBody := contentLen != 0 + cc.addStreamLocked(cs) // assigns stream ID + cc.mu.Unlock() // TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere? - var requestedGzip bool if !cc.t.disableCompression() && req.Header.Get("Accept-Encoding") == "" && req.Header.Get("Range") == "" && - req.Method != "HEAD" { + !cs.isHead { // Request gzip only, not deflate. Deflate is ambiguous and // not as universally supported anyway. // See: https://zlib.net/zlib_faq.html#faq39 @@ -7583,195 +7761,223 @@ func (cc *http2ClientConn) roundTrip(req *Request) (res *Response, gotErrAfterRe // We don't request gzip if the request is for a range, since // auto-decoding a portion of a gzipped document will just fail // anyway. See https://golang.org/issue/8923 - requestedGzip = true + cs.requestedGzip = true } - // we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is - // sent by writeRequestBody below, along with any Trailers, - // again in form HEADERS{1}, CONTINUATION{0,}) - hdrs, err := cc.encodeHeaders(req, requestedGzip, trailers, contentLen) - if err != nil { - cc.mu.Unlock() - return nil, false, err + continueTimeout := cc.t.expectContinueTimeout() + if continueTimeout != 0 && + !httpguts.HeaderValuesContainsToken( + req.Header["Expect"], + "100-continue") { + continueTimeout = 0 + cs.on100 = make(chan struct{}, 1) } - cs := cc.newStream() - cs.req = req - cs.trace = httptrace.ContextClientTrace(req.Context()) - cs.requestedGzip = requestedGzip - bodyWriter := cc.t.getBodyWriterState(cs, body) - cs.on100 = bodyWriter.on100 + // Past this point (where we send request headers), it is possible for + // RoundTrip to return successfully. Since the RoundTrip contract permits + // the caller to "mutate or reuse" the Request after closing the Response's Body, + // we must take care when referencing the Request from here on. + err = cs.encodeAndWriteHeaders(req) + <-cc.reqHeaderMu + if err != nil { + return err + } - defer func() { - cc.wmu.Lock() - werr := cc.werr - cc.wmu.Unlock() - if werr != nil { - cc.Close() + hasBody := cs.reqBodyContentLength != 0 + if !hasBody { + cs.sentEndStream = true + } else { + if continueTimeout != 0 { + http2traceWait100Continue(cs.trace) + timer := time.NewTimer(continueTimeout) + select { + case <-timer.C: + err = nil + case <-cs.on100: + err = nil + case <-cs.abort: + err = cs.abortErr + case <-ctx.Done(): + err = ctx.Err() + case <-cs.reqCancel: + err = http2errRequestCanceled + } + timer.Stop() + if err != nil { + http2traceWroteRequest(cs.trace, err) + return err + } } - }() - - cc.wmu.Lock() - endStream := !hasBody && !hasTrailers - werr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs) - cc.wmu.Unlock() - http2traceWroteHeaders(cs.trace) - cc.mu.Unlock() - if werr != nil { - if hasBody { - req.Body.Close() // per RoundTripper contract - bodyWriter.cancel() + if err = cs.writeRequestBody(req); err != nil { + if err != http2errStopReqBodyWrite { + http2traceWroteRequest(cs.trace, err) + return err + } + } else { + cs.sentEndStream = true } - cc.forgetStreamID(cs.ID) - // Don't bother sending a RST_STREAM (our write already failed; - // no need to keep writing) - http2traceWroteRequest(cs.trace, werr) - return nil, false, werr } + http2traceWroteRequest(cs.trace, err) + var respHeaderTimer <-chan time.Time - if hasBody { - bodyWriter.scheduleBodyWrite() - } else { - http2traceWroteRequest(cs.trace, nil) - if d := cc.responseHeaderTimeout(); d != 0 { - timer := time.NewTimer(d) - defer timer.Stop() - respHeaderTimer = timer.C + var respHeaderRecv chan struct{} + if d := cc.responseHeaderTimeout(); d != 0 { + timer := time.NewTimer(d) + defer timer.Stop() + respHeaderTimer = timer.C + respHeaderRecv = cs.respHeaderRecv + } + // Wait until the peer half-closes its end of the stream, + // or until the request is aborted (via context, error, or otherwise), + // whichever comes first. + for { + select { + case <-cs.peerClosed: + return nil + case <-respHeaderTimer: + return http2errTimeout + case <-respHeaderRecv: + respHeaderTimer = nil // keep waiting for END_STREAM + case <-cs.abort: + return cs.abortErr + case <-ctx.Done(): + return ctx.Err() + case <-cs.reqCancel: + return http2errRequestCanceled } } +} - readLoopResCh := cs.resc - bodyWritten := false - ctx := req.Context() +func (cs *http2clientStream) encodeAndWriteHeaders(req *Request) error { + cc := cs.cc + ctx := cs.ctx - handleReadLoopResponse := func(re http2resAndError) (*Response, bool, error) { - res := re.res - if re.err != nil || res.StatusCode > 299 { - // On error or status code 3xx, 4xx, 5xx, etc abort any - // ongoing write, assuming that the server doesn't care - // about our request body. If the server replied with 1xx or - // 2xx, however, then assume the server DOES potentially - // want our body (e.g. full-duplex streaming: - // golang.org/issue/13444). If it turns out the server - // doesn't, they'll RST_STREAM us soon enough. This is a - // heuristic to avoid adding knobs to Transport. Hopefully - // we can keep it. - bodyWriter.cancel() - cs.abortRequestBodyWrite(http2errStopReqBodyWrite) - if hasBody && !bodyWritten { - <-bodyWriter.resc - } - } - if re.err != nil { - cc.forgetStreamID(cs.ID) - return nil, cs.getStartedWrite(), re.err - } - res.Request = req - res.TLS = cc.tlsState - return res, false, nil + cc.wmu.Lock() + defer cc.wmu.Unlock() + + // If the request was canceled while waiting for cc.mu, just quit. + select { + case <-cs.abort: + return cs.abortErr + case <-ctx.Done(): + return ctx.Err() + case <-cs.reqCancel: + return http2errRequestCanceled + default: } - for { + // Encode headers. + // + // we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is + // sent by writeRequestBody below, along with any Trailers, + // again in form HEADERS{1}, CONTINUATION{0,}) + trailers, err := http2commaSeparatedTrailers(req) + if err != nil { + return err + } + hasTrailers := trailers != "" + contentLen := http2actualContentLength(req) + hasBody := contentLen != 0 + hdrs, err := cc.encodeHeaders(req, cs.requestedGzip, trailers, contentLen) + if err != nil { + return err + } + + // Write the request. + endStream := !hasBody && !hasTrailers + cs.sentHeaders = true + err = cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs) + http2traceWroteHeaders(cs.trace) + return err +} + +// cleanupWriteRequest performs post-request tasks. +// +// If err (the result of writeRequest) is non-nil and the stream is not closed, +// cleanupWriteRequest will send a reset to the peer. +func (cs *http2clientStream) cleanupWriteRequest(err error) { + cc := cs.cc + + if cs.ID == 0 { + // We were canceled before creating the stream, so return our reservation. + cc.decrStreamReservations() + } + + // TODO: write h12Compare test showing whether + // Request.Body is closed by the Transport, + // and in multiple cases: server replies <=299 and >299 + // while still writing request body + cc.mu.Lock() + bodyClosed := cs.reqBodyClosed + cs.reqBodyClosed = true + cc.mu.Unlock() + if !bodyClosed && cs.reqBody != nil { + cs.reqBody.Close() + } + + if err != nil && cs.sentEndStream { + // If the connection is closed immediately after the response is read, + // we may be aborted before finishing up here. If the stream was closed + // cleanly on both sides, there is no error. select { - case re := <-readLoopResCh: - return handleReadLoopResponse(re) - case <-respHeaderTimer: - if !hasBody || bodyWritten { - cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) - } else { - bodyWriter.cancel() - cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel) - <-bodyWriter.resc - } - cc.forgetStreamID(cs.ID) - return nil, cs.getStartedWrite(), http2errTimeout - case <-ctx.Done(): - if !hasBody || bodyWritten { - cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) - } else { - bodyWriter.cancel() - cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel) - <-bodyWriter.resc - } - cc.forgetStreamID(cs.ID) - return nil, cs.getStartedWrite(), ctx.Err() - case <-req.Cancel: - if !hasBody || bodyWritten { - cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) + case <-cs.peerClosed: + err = nil + default: + } + } + if err != nil { + cs.abortStream(err) // possibly redundant, but harmless + if cs.sentHeaders { + if se, ok := err.(http2StreamError); ok { + if se.Cause != http2errFromPeer { + cc.writeStreamReset(cs.ID, se.Code, err) + } } else { - bodyWriter.cancel() - cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel) - <-bodyWriter.resc - } - cc.forgetStreamID(cs.ID) - return nil, cs.getStartedWrite(), http2errRequestCanceled - case <-cs.peerReset: - // processResetStream already removed the - // stream from the streams map; no need for - // forgetStreamID. - return nil, cs.getStartedWrite(), cs.resetErr - case err := <-bodyWriter.resc: - bodyWritten = true - // Prefer the read loop's response, if available. Issue 16102. - select { - case re := <-readLoopResCh: - return handleReadLoopResponse(re) - default: - } - if err != nil { - cc.forgetStreamID(cs.ID) - return nil, cs.getStartedWrite(), err - } - if d := cc.responseHeaderTimeout(); d != 0 { - timer := time.NewTimer(d) - defer timer.Stop() - respHeaderTimer = timer.C + cc.writeStreamReset(cs.ID, http2ErrCodeCancel, err) } } + cs.bufPipe.CloseWithError(err) // no-op if already closed + } else { + if cs.sentHeaders && !cs.sentEndStream { + cc.writeStreamReset(cs.ID, http2ErrCodeNo, nil) + } + cs.bufPipe.CloseWithError(http2errRequestCanceled) + } + if cs.ID != 0 { + cc.forgetStreamID(cs.ID) } + + cc.wmu.Lock() + werr := cc.werr + cc.wmu.Unlock() + if werr != nil { + cc.Close() + } + + close(cs.donec) } -// awaitOpenSlotForRequest waits until len(streams) < maxConcurrentStreams. +// awaitOpenSlotForStream waits until len(streams) < maxConcurrentStreams. // Must hold cc.mu. -func (cc *http2ClientConn) awaitOpenSlotForRequest(req *Request) error { - var waitingForConn chan struct{} - var waitingForConnErr error // guarded by cc.mu +func (cc *http2ClientConn) awaitOpenSlotForStreamLocked(cs *http2clientStream) error { for { cc.lastActive = time.Now() if cc.closed || !cc.canTakeNewRequestLocked() { - if waitingForConn != nil { - close(waitingForConn) - } return http2errClientConnUnusable } cc.lastIdle = time.Time{} - if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) { - if waitingForConn != nil { - close(waitingForConn) - } + if int64(len(cc.streams)) < int64(cc.maxConcurrentStreams) { return nil } - // Unfortunately, we cannot wait on a condition variable and channel at - // the same time, so instead, we spin up a goroutine to check if the - // request is canceled while we wait for a slot to open in the connection. - if waitingForConn == nil { - waitingForConn = make(chan struct{}) - go func() { - if err := http2awaitRequestCancel(req, waitingForConn); err != nil { - cc.mu.Lock() - waitingForConnErr = err - cc.cond.Broadcast() - cc.mu.Unlock() - } - }() - } cc.pendingRequests++ cc.cond.Wait() cc.pendingRequests-- - if waitingForConnErr != nil { - return waitingForConnErr + select { + case <-cs.abort: + return cs.abortErr + default: } } } @@ -7798,10 +8004,6 @@ func (cc *http2ClientConn) writeHeaders(streamID uint32, endStream bool, maxFram cc.fr.WriteContinuation(streamID, endHeaders, chunk) } } - // TODO(bradfitz): this Flush could potentially block (as - // could the WriteHeaders call(s) above), which means they - // wouldn't respond to Request.Cancel being readable. That's - // rare, but this should probably be in a goroutine. cc.bw.Flush() return cc.werr } @@ -7817,32 +8019,59 @@ var ( http2errReqBodyTooLong = errors.New("http2: request body larger than specified content length") ) -func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) { +// frameScratchBufferLen returns the length of a buffer to use for +// outgoing request bodies to read/write to/from. +// +// It returns max(1, min(peer's advertised max frame size, +// Request.ContentLength+1, 512KB)). +func (cs *http2clientStream) frameScratchBufferLen(maxFrameSize int) int { + const max = 512 << 10 + n := int64(maxFrameSize) + if n > max { + n = max + } + if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n { + // Add an extra byte past the declared content-length to + // give the caller's Request.Body io.Reader a chance to + // give us more bytes than they declared, so we can catch it + // early. + n = cl + 1 + } + if n < 1 { + return 1 + } + return int(n) // doesn't truncate; max is 512K +} + +var http2bufPool sync.Pool // of *[]byte + +func (cs *http2clientStream) writeRequestBody(req *Request) (err error) { cc := cs.cc + body := cs.reqBody sentEnd := false // whether we sent the final DATA frame w/ END_STREAM - buf := cc.frameScratchBuffer() - defer cc.putFrameScratchBuffer(buf) - - defer func() { - http2traceWroteRequest(cs.trace, err) - // TODO: write h12Compare test showing whether - // Request.Body is closed by the Transport, - // and in multiple cases: server replies <=299 and >299 - // while still writing request body - cerr := bodyCloser.Close() - if err == nil { - err = cerr - } - }() - req := cs.req hasTrailers := req.Trailer != nil - remainLen := http2actualContentLength(req) + remainLen := cs.reqBodyContentLength hasContentLen := remainLen != -1 + cc.mu.Lock() + maxFrameSize := int(cc.maxFrameSize) + cc.mu.Unlock() + + // Scratch buffer for reading into & writing from. + scratchLen := cs.frameScratchBufferLen(maxFrameSize) + var buf []byte + if bp, ok := http2bufPool.Get().(*[]byte); ok && len(*bp) >= scratchLen { + defer http2bufPool.Put(bp) + buf = *bp + } else { + buf = make([]byte, scratchLen) + defer http2bufPool.Put(&buf) + } + var sawEOF bool for !sawEOF { - n, err := body.Read(buf[:len(buf)-1]) + n, err := body.Read(buf[:len(buf)]) if hasContentLen { remainLen -= int64(n) if remainLen == 0 && err == nil { @@ -7853,13 +8082,13 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Clos // to send the END_STREAM bit early, double-check that we're actually // at EOF. Subsequent reads should return (0, EOF) at this point. // If either value is different, we return an error in one of two ways below. + var scratch [1]byte var n1 int - n1, err = body.Read(buf[n:]) + n1, err = body.Read(scratch[:]) remainLen -= int64(n1) } if remainLen < 0 { err = http2errReqBodyTooLong - cc.writeStreamReset(cs.ID, http2ErrCodeCancel, err) return err } } @@ -7867,7 +8096,6 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Clos sawEOF = true err = nil } else if err != nil { - cc.writeStreamReset(cs.ID, http2ErrCodeCancel, err) return err } @@ -7875,13 +8103,7 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Clos for len(remain) > 0 && err == nil { var allowed int32 allowed, err = cs.awaitFlowControl(len(remain)) - switch { - case err == http2errStopReqBodyWrite: - return err - case err == http2errStopReqBodyWriteAndCancel: - cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) - return err - case err != nil: + if err != nil { return err } cc.wmu.Lock() @@ -7912,24 +8134,26 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Clos return nil } - var trls []byte - if hasTrailers { - cc.mu.Lock() - trls, err = cc.encodeTrailers(req) - cc.mu.Unlock() - if err != nil { - cc.writeStreamReset(cs.ID, http2ErrCodeInternal, err) - cc.forgetStreamID(cs.ID) - return err - } - } - + // Since the RoundTrip contract permits the caller to "mutate or reuse" + // a request after the Response's Body is closed, verify that this hasn't + // happened before accessing the trailers. cc.mu.Lock() - maxFrameSize := int(cc.maxFrameSize) + trailer := req.Trailer + err = cs.abortErr cc.mu.Unlock() + if err != nil { + return err + } cc.wmu.Lock() defer cc.wmu.Unlock() + var trls []byte + if len(trailer) > 0 { + trls, err = cc.encodeTrailers(trailer) + if err != nil { + return err + } + } // Two ways to send END_STREAM: either with trailers, or // with an empty DATA frame. @@ -7950,17 +8174,24 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Clos // if the stream is dead. func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) { cc := cs.cc + ctx := cs.ctx cc.mu.Lock() defer cc.mu.Unlock() for { if cc.closed { return 0, http2errClientConnClosed } - if cs.stopReqBody != nil { - return 0, cs.stopReqBody + if cs.reqBodyClosed { + return 0, http2errStopReqBodyWrite } - if err := cs.checkResetOrDone(); err != nil { - return 0, err + select { + case <-cs.abort: + return 0, cs.abortErr + case <-ctx.Done(): + return 0, ctx.Err() + case <-cs.reqCancel: + return 0, http2errRequestCanceled + default: } if a := cs.flow.available(); a > 0 { take := a @@ -7978,9 +8209,14 @@ func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err er } } -// requires cc.mu be held. +var http2errNilRequestURL = errors.New("http2: Request.URI is nil") + +// requires cc.wmu be held. func (cc *http2ClientConn) encodeHeaders(req *Request, addGzipHeader bool, trailers string, contentLength int64) ([]byte, error) { cc.hbuf.Reset() + if req.URL == nil { + return nil, http2errNilRequestURL + } host := req.Host if host == "" { @@ -8043,19 +8279,21 @@ func (cc *http2ClientConn) encodeHeaders(req *Request, addGzipHeader bool, trail var didUA bool for k, vv := range req.Header { - if strings.EqualFold(k, "host") || strings.EqualFold(k, "content-length") { + if http2asciiEqualFold(k, "host") || http2asciiEqualFold(k, "content-length") { // Host is :authority, already sent. // Content-Length is automatic, set below. continue - } else if strings.EqualFold(k, "connection") || strings.EqualFold(k, "proxy-connection") || - strings.EqualFold(k, "transfer-encoding") || strings.EqualFold(k, "upgrade") || - strings.EqualFold(k, "keep-alive") { + } else if http2asciiEqualFold(k, "connection") || + http2asciiEqualFold(k, "proxy-connection") || + http2asciiEqualFold(k, "transfer-encoding") || + http2asciiEqualFold(k, "upgrade") || + http2asciiEqualFold(k, "keep-alive") { // Per 8.1.2.2 Connection-Specific Header // Fields, don't send connection-specific // fields. We have already checked if any // are error-worthy so just ignore the rest. continue - } else if strings.EqualFold(k, "user-agent") { + } else if http2asciiEqualFold(k, "user-agent") { // Match Go's http1 behavior: at most one // User-Agent. If set to nil or empty string, // then omit it. Otherwise if not mentioned, @@ -8068,7 +8306,7 @@ func (cc *http2ClientConn) encodeHeaders(req *Request, addGzipHeader bool, trail if vv[0] == "" { continue } - } else if strings.EqualFold(k, "cookie") { + } else if http2asciiEqualFold(k, "cookie") { // Per 8.1.2.5 To allow for better compression efficiency, the // Cookie header field MAY be split into separate header fields, // each with one or more cookie-pairs. @@ -8127,7 +8365,12 @@ func (cc *http2ClientConn) encodeHeaders(req *Request, addGzipHeader bool, trail // Header list size is ok. Write the headers. enumerateHeaders(func(name, value string) { - name = strings.ToLower(name) + name, ascii := http2asciiToLower(name) + if !ascii { + // Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header + // field names have to be ASCII characters (just as in HTTP/1.x). + return + } cc.writeHeader(name, value) if traceHeaders { http2traceWroteHeaderField(trace, name, value) @@ -8159,12 +8402,12 @@ func http2shouldSendReqContentLength(method string, contentLength int64) bool { } } -// requires cc.mu be held. -func (cc *http2ClientConn) encodeTrailers(req *Request) ([]byte, error) { +// requires cc.wmu be held. +func (cc *http2ClientConn) encodeTrailers(trailer Header) ([]byte, error) { cc.hbuf.Reset() hlSize := uint64(0) - for k, vv := range req.Trailer { + for k, vv := range trailer { for _, v := range vv { hf := hpack.HeaderField{Name: k, Value: v} hlSize += uint64(hf.Size()) @@ -8174,10 +8417,15 @@ func (cc *http2ClientConn) encodeTrailers(req *Request) ([]byte, error) { return nil, http2errRequestHeaderListSize } - for k, vv := range req.Trailer { + for k, vv := range trailer { + lowKey, ascii := http2asciiToLower(k) + if !ascii { + // Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header + // field names have to be ASCII characters (just as in HTTP/1.x). + continue + } // Transfer-Encoding, etc.. have already been filtered at the // start of RoundTrip - lowKey := strings.ToLower(k) for _, v := range vv { cc.writeHeader(lowKey, v) } @@ -8199,51 +8447,51 @@ type http2resAndError struct { } // requires cc.mu be held. -func (cc *http2ClientConn) newStream() *http2clientStream { - cs := &http2clientStream{ - cc: cc, - ID: cc.nextStreamID, - resc: make(chan http2resAndError, 1), - peerReset: make(chan struct{}), - done: make(chan struct{}), - } +func (cc *http2ClientConn) addStreamLocked(cs *http2clientStream) { cs.flow.add(int32(cc.initialWindowSize)) cs.flow.setConnFlow(&cc.flow) cs.inflow.add(http2transportDefaultStreamFlow) cs.inflow.setConnFlow(&cc.inflow) + cs.ID = cc.nextStreamID cc.nextStreamID += 2 cc.streams[cs.ID] = cs - return cs + if cs.ID == 0 { + panic("assigned stream ID 0") + } } func (cc *http2ClientConn) forgetStreamID(id uint32) { - cc.streamByID(id, true) -} - -func (cc *http2ClientConn) streamByID(id uint32, andRemove bool) *http2clientStream { cc.mu.Lock() - defer cc.mu.Unlock() - cs := cc.streams[id] - if andRemove && cs != nil && !cc.closed { - cc.lastActive = time.Now() - delete(cc.streams, id) - if len(cc.streams) == 0 && cc.idleTimer != nil { - cc.idleTimer.Reset(cc.idleTimeout) - cc.lastIdle = time.Now() - } - close(cs.done) - // Wake up checkResetOrDone via clientStream.awaitFlowControl and - // wake up RoundTrip if there is a pending request. - cc.cond.Broadcast() + slen := len(cc.streams) + delete(cc.streams, id) + if len(cc.streams) != slen-1 { + panic("forgetting unknown stream id") + } + cc.lastActive = time.Now() + if len(cc.streams) == 0 && cc.idleTimer != nil { + cc.idleTimer.Reset(cc.idleTimeout) + cc.lastIdle = time.Now() + } + // Wake up writeRequestBody via clientStream.awaitFlowControl and + // wake up RoundTrip if there is a pending request. + cc.cond.Broadcast() + + closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() + if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 { + if http2VerboseLogs { + cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, cc.nextStreamID-2) + } + cc.closed = true + defer cc.tconn.Close() } - return cs + + cc.mu.Unlock() } // clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop. type http2clientConnReadLoop struct { - _ http2incomparable - cc *http2ClientConn - closeWhenIdle bool + _ http2incomparable + cc *http2ClientConn } // readLoop runs in its own goroutine and reads and dispatches frames. @@ -8303,23 +8551,22 @@ func (rl *http2clientConnReadLoop) cleanup() { } else if err == io.EOF { err = io.ErrUnexpectedEOF } + cc.closed = true for _, cs := range cc.streams { - cs.bufPipe.CloseWithError(err) // no-op if already closed select { - case cs.resc <- http2resAndError{err: err}: + case <-cs.peerClosed: + // The server closed the stream before closing the conn, + // so no need to interrupt it. default: + cs.abortStreamLocked(err) } - close(cs.done) } - cc.closed = true cc.cond.Broadcast() cc.mu.Unlock() } func (rl *http2clientConnReadLoop) run() error { cc := rl.cc - rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse - gotReply := false // ever saw a HEADERS reply gotSettings := false readIdleTimeout := cc.t.ReadIdleTimeout var t *time.Timer @@ -8336,9 +8583,7 @@ func (rl *http2clientConnReadLoop) run() error { cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err) } if se, ok := err.(http2StreamError); ok { - if cs := cc.streamByID(se.StreamID, false); cs != nil { - cs.cc.writeStreamReset(cs.ID, se.Code, err) - cs.cc.forgetStreamID(cs.ID) + if cs := rl.streamByID(se.StreamID); cs != nil { if se.Cause == nil { se.Cause = cc.fr.errDetail } @@ -8358,22 +8603,16 @@ func (rl *http2clientConnReadLoop) run() error { } gotSettings = true } - maybeIdle := false // whether frame might transition us to idle switch f := f.(type) { case *http2MetaHeadersFrame: err = rl.processHeaders(f) - maybeIdle = true - gotReply = true case *http2DataFrame: err = rl.processData(f) - maybeIdle = true case *http2GoAwayFrame: err = rl.processGoAway(f) - maybeIdle = true case *http2RSTStreamFrame: err = rl.processResetStream(f) - maybeIdle = true case *http2SettingsFrame: err = rl.processSettings(f) case *http2PushPromiseFrame: @@ -8391,38 +8630,24 @@ func (rl *http2clientConnReadLoop) run() error { } return err } - if rl.closeWhenIdle && gotReply && maybeIdle { - cc.closeIfIdle() - } } } func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) error { - cc := rl.cc - cs := cc.streamByID(f.StreamID, false) + cs := rl.streamByID(f.StreamID) if cs == nil { // We'd get here if we canceled a request while the // server had its response still in flight. So if this // was just something we canceled, ignore it. return nil } - if f.StreamEnded() { - // Issue 20521: If the stream has ended, streamByID() causes - // clientStream.done to be closed, which causes the request's bodyWriter - // to be closed with an errStreamClosed, which may be received by - // clientConn.RoundTrip before the result of processing these headers. - // Deferring stream closure allows the header processing to occur first. - // clientConn.RoundTrip may still receive the bodyWriter error first, but - // the fix for issue 16102 prioritises any response. - // - // Issue 22413: If there is no request body, we should close the - // stream before writing to cs.resc so that the stream is closed - // immediately once RoundTrip returns. - if cs.req.Body != nil { - defer cc.forgetStreamID(f.StreamID) - } else { - cc.forgetStreamID(f.StreamID) - } + if cs.readClosed { + rl.endStreamError(cs, http2StreamError{ + StreamID: f.StreamID, + Code: http2ErrCodeProtocol, + Cause: errors.New("protocol error: headers after END_STREAM"), + }) + return nil } if !cs.firstByte { if cs.trace != nil { @@ -8446,9 +8671,11 @@ func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) erro return err } // Any other error type is a stream error. - cs.cc.writeStreamReset(f.StreamID, http2ErrCodeProtocol, err) - cc.forgetStreamID(cs.ID) - cs.resc <- http2resAndError{err: err} + rl.endStreamError(cs, http2StreamError{ + StreamID: f.StreamID, + Code: http2ErrCodeProtocol, + Cause: err, + }) return nil // return nil from process* funcs to keep conn alive } if res == nil { @@ -8456,7 +8683,11 @@ func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) erro return nil } cs.resTrailer = &res.Trailer - cs.resc <- http2resAndError{res: res} + cs.res = res + close(cs.respHeaderRecv) + if f.StreamEnded() { + rl.endStream(cs) + } return nil } @@ -8518,6 +8749,9 @@ func (rl *http2clientConnReadLoop) handleResponse(cs *http2clientStream, f *http } if statusCode >= 100 && statusCode <= 199 { + if f.StreamEnded() { + return nil, errors.New("1xx informational response with END_STREAM flag") + } cs.num1xx++ const max1xxResponses = 5 // arbitrary bound on number of informational responses, same as net/http if cs.num1xx > max1xxResponses { @@ -8530,40 +8764,47 @@ func (rl *http2clientConnReadLoop) handleResponse(cs *http2clientStream, f *http } if statusCode == 100 { http2traceGot100Continue(cs.trace) - if cs.on100 != nil { - cs.on100() // forces any write delay timer to fire + select { + case cs.on100 <- struct{}{}: + default: } } cs.pastHeaders = false // do it all again return nil, nil } - streamEnded := f.StreamEnded() - isHead := cs.req.Method == "HEAD" - if !streamEnded || isHead { - res.ContentLength = -1 - if clens := res.Header["Content-Length"]; len(clens) == 1 { - if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil { - res.ContentLength = int64(cl) - } else { - // TODO: care? unlike http/1, it won't mess up our framing, so it's - // more safe smuggling-wise to ignore. - } - } else if len(clens) > 1 { + res.ContentLength = -1 + if clens := res.Header["Content-Length"]; len(clens) == 1 { + if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil { + res.ContentLength = int64(cl) + } else { // TODO: care? unlike http/1, it won't mess up our framing, so it's // more safe smuggling-wise to ignore. } + } else if len(clens) > 1 { + // TODO: care? unlike http/1, it won't mess up our framing, so it's + // more safe smuggling-wise to ignore. + } else if f.StreamEnded() && !cs.isHead { + res.ContentLength = 0 } - if streamEnded || isHead { + if cs.isHead { res.Body = http2noBody return res, nil } - cs.bufPipe = http2pipe{b: &http2dataBuffer{expected: res.ContentLength}} + if f.StreamEnded() { + if res.ContentLength > 0 { + res.Body = http2missingBody{} + } else { + res.Body = http2noBody + } + return res, nil + } + + cs.bufPipe.setBuffer(&http2dataBuffer{expected: res.ContentLength}) cs.bytesRemain = res.ContentLength res.Body = http2transportResponseBody{cs} - go cs.awaitRequestCancel(cs.req) if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" { res.Header.Del("Content-Encoding") @@ -8604,8 +8845,7 @@ func (rl *http2clientConnReadLoop) processTrailers(cs *http2clientStream, f *htt } // transportResponseBody is the concrete type of Transport.RoundTrip's -// Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body. -// On Close it sends RST_STREAM if EOF wasn't already seen. +// Response.Body. It is an io.ReadCloser. type http2transportResponseBody struct { cs *http2clientStream } @@ -8623,7 +8863,7 @@ func (b http2transportResponseBody) Read(p []byte) (n int, err error) { n = int(cs.bytesRemain) if err == nil { err = errors.New("net/http: server replied with more than declared Content-Length; truncated") - cc.writeStreamReset(cs.ID, http2ErrCodeProtocol, err) + cs.abortStream(err) } cs.readErr = err return int(cs.bytesRemain), err @@ -8641,8 +8881,6 @@ func (b http2transportResponseBody) Read(p []byte) (n int, err error) { } cc.mu.Lock() - defer cc.mu.Unlock() - var connAdd, streamAdd int32 // Check the conn-level first, before the stream-level. if v := cc.inflow.available(); v < http2transportDefaultConnFlow/2 { @@ -8659,6 +8897,8 @@ func (b http2transportResponseBody) Read(p []byte) (n int, err error) { cs.inflow.add(streamAdd) } } + cc.mu.Unlock() + if connAdd != 0 || streamAdd != 0 { cc.wmu.Lock() defer cc.wmu.Unlock() @@ -8679,34 +8919,42 @@ func (b http2transportResponseBody) Close() error { cs := b.cs cc := cs.cc - serverSentStreamEnd := cs.bufPipe.Err() == io.EOF unread := cs.bufPipe.Len() - - if unread > 0 || !serverSentStreamEnd { + if unread > 0 { cc.mu.Lock() - cc.wmu.Lock() - if !serverSentStreamEnd { - cc.fr.WriteRSTStream(cs.ID, http2ErrCodeCancel) - cs.didReset = true - } // Return connection-level flow control. if unread > 0 { cc.inflow.add(int32(unread)) + } + cc.mu.Unlock() + + // TODO(dneil): Acquiring this mutex can block indefinitely. + // Move flow control return to a goroutine? + cc.wmu.Lock() + // Return connection-level flow control. + if unread > 0 { cc.fr.WriteWindowUpdate(0, uint32(unread)) } cc.bw.Flush() cc.wmu.Unlock() - cc.mu.Unlock() } cs.bufPipe.BreakWithError(http2errClosedResponseBody) - cc.forgetStreamID(cs.ID) + cs.abortStream(http2errClosedResponseBody) + + select { + case <-cs.donec: + case <-cs.ctx.Done(): + return cs.ctx.Err() + case <-cs.reqCancel: + return http2errRequestCanceled + } return nil } func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error { cc := rl.cc - cs := cc.streamByID(f.StreamID, f.StreamEnded()) + cs := rl.streamByID(f.StreamID) data := f.Data() if cs == nil { cc.mu.Lock() @@ -8735,6 +8983,14 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error { } return nil } + if cs.readClosed { + cc.logf("protocol error: received DATA after END_STREAM") + rl.endStreamError(cs, http2StreamError{ + StreamID: f.StreamID, + Code: http2ErrCodeProtocol, + }) + return nil + } if !cs.firstByte { cc.logf("protocol error: received DATA before a HEADERS frame") rl.endStreamError(cs, http2StreamError{ @@ -8744,7 +9000,7 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error { return nil } if f.Length > 0 { - if cs.req.Method == "HEAD" && len(data) > 0 { + if cs.isHead && len(data) > 0 { cc.logf("protocol error: received DATA on a HEAD request") rl.endStreamError(cs, http2StreamError{ StreamID: f.StreamID, @@ -8766,30 +9022,39 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error { if pad := int(f.Length) - len(data); pad > 0 { refund += pad } - // Return len(data) now if the stream is already closed, - // since data will never be read. - didReset := cs.didReset - if didReset { - refund += len(data) + + didReset := false + var err error + if len(data) > 0 { + if _, err = cs.bufPipe.Write(data); err != nil { + // Return len(data) now if the stream is already closed, + // since data will never be read. + didReset = true + refund += len(data) + } } + if refund > 0 { cc.inflow.add(int32(refund)) + if !didReset { + cs.inflow.add(int32(refund)) + } + } + cc.mu.Unlock() + + if refund > 0 { cc.wmu.Lock() cc.fr.WriteWindowUpdate(0, uint32(refund)) if !didReset { - cs.inflow.add(int32(refund)) cc.fr.WriteWindowUpdate(cs.ID, uint32(refund)) } cc.bw.Flush() cc.wmu.Unlock() } - cc.mu.Unlock() - if len(data) > 0 && !didReset { - if _, err := cs.bufPipe.Write(data); err != nil { - rl.endStreamError(cs, err) - return err - } + if err != nil { + rl.endStreamError(cs, err) + return nil } } @@ -8802,24 +9067,26 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error { func (rl *http2clientConnReadLoop) endStream(cs *http2clientStream) { // TODO: check that any declared content-length matches, like // server.go's (*stream).endStream method. - rl.endStreamError(cs, nil) + if !cs.readClosed { + cs.readClosed = true + cs.bufPipe.closeWithErrorAndCode(io.EOF, cs.copyTrailers) + close(cs.peerClosed) + } } func (rl *http2clientConnReadLoop) endStreamError(cs *http2clientStream, err error) { - var code func() - if err == nil { - err = io.EOF - code = cs.copyTrailers - } - if http2isConnectionCloseRequest(cs.req) { - rl.closeWhenIdle = true - } - cs.bufPipe.closeWithErrorAndCode(err, code) + cs.readAborted = true + cs.abortStream(err) +} - select { - case cs.resc <- http2resAndError{err: err}: - default: +func (rl *http2clientConnReadLoop) streamByID(id uint32) *http2clientStream { + rl.cc.mu.Lock() + defer rl.cc.mu.Unlock() + cs := rl.cc.streams[id] + if cs != nil && !cs.readAborted { + return cs } + return nil } func (cs *http2clientStream) copyTrailers() { @@ -8845,6 +9112,23 @@ func (rl *http2clientConnReadLoop) processGoAway(f *http2GoAwayFrame) error { func (rl *http2clientConnReadLoop) processSettings(f *http2SettingsFrame) error { cc := rl.cc + // Locking both mu and wmu here allows frame encoding to read settings with only wmu held. + // Acquiring wmu when f.IsAck() is unnecessary, but convenient and mostly harmless. + cc.wmu.Lock() + defer cc.wmu.Unlock() + + if err := rl.processSettingsNoWrite(f); err != nil { + return err + } + if !f.IsAck() { + cc.fr.WriteSettingsAck() + cc.bw.Flush() + } + return nil +} + +func (rl *http2clientConnReadLoop) processSettingsNoWrite(f *http2SettingsFrame) error { + cc := rl.cc cc.mu.Lock() defer cc.mu.Unlock() @@ -8856,12 +9140,14 @@ func (rl *http2clientConnReadLoop) processSettings(f *http2SettingsFrame) error return http2ConnectionError(http2ErrCodeProtocol) } + var seenMaxConcurrentStreams bool err := f.ForeachSetting(func(s http2Setting) error { switch s.ID { case http2SettingMaxFrameSize: cc.maxFrameSize = s.Val case http2SettingMaxConcurrentStreams: cc.maxConcurrentStreams = s.Val + seenMaxConcurrentStreams = true case http2SettingMaxHeaderListSize: cc.peerMaxHeaderListSize = uint64(s.Val) case http2SettingInitialWindowSize: @@ -8893,17 +9179,23 @@ func (rl *http2clientConnReadLoop) processSettings(f *http2SettingsFrame) error return err } - cc.wmu.Lock() - defer cc.wmu.Unlock() + if !cc.seenSettings { + if !seenMaxConcurrentStreams { + // This was the servers initial SETTINGS frame and it + // didn't contain a MAX_CONCURRENT_STREAMS field so + // increase the number of concurrent streams this + // connection can establish to our default. + cc.maxConcurrentStreams = http2defaultMaxConcurrentStreams + } + cc.seenSettings = true + } - cc.fr.WriteSettingsAck() - cc.bw.Flush() - return cc.werr + return nil } func (rl *http2clientConnReadLoop) processWindowUpdate(f *http2WindowUpdateFrame) error { cc := rl.cc - cs := cc.streamByID(f.StreamID, false) + cs := rl.streamByID(f.StreamID) if f.StreamID != 0 && cs == nil { return nil } @@ -8923,24 +9215,19 @@ func (rl *http2clientConnReadLoop) processWindowUpdate(f *http2WindowUpdateFrame } func (rl *http2clientConnReadLoop) processResetStream(f *http2RSTStreamFrame) error { - cs := rl.cc.streamByID(f.StreamID, true) + cs := rl.streamByID(f.StreamID) if cs == nil { // TODO: return error if server tries to RST_STEAM an idle stream return nil } - select { - case <-cs.peerReset: - // Already reset. - // This is the only goroutine - // which closes this, so there - // isn't a race. - default: - err := http2streamError(cs.ID, f.ErrCode) - cs.resetErr = err - close(cs.peerReset) - cs.bufPipe.CloseWithError(err) - cs.cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl + serr := http2streamError(cs.ID, f.ErrCode) + serr.Cause = http2errFromPeer + if f.ErrCode == http2ErrCodeProtocol { + rl.cc.SetDoNotReuse() } + cs.abortStream(serr) + + cs.bufPipe.CloseWithError(serr) return nil } @@ -8962,19 +9249,24 @@ func (cc *http2ClientConn) Ping(ctx context.Context) error { } cc.mu.Unlock() } - cc.wmu.Lock() - if err := cc.fr.WritePing(false, p); err != nil { - cc.wmu.Unlock() - return err - } - if err := cc.bw.Flush(); err != nil { - cc.wmu.Unlock() - return err - } - cc.wmu.Unlock() + errc := make(chan error, 1) + go func() { + cc.wmu.Lock() + defer cc.wmu.Unlock() + if err := cc.fr.WritePing(false, p); err != nil { + errc <- err + return + } + if err := cc.bw.Flush(); err != nil { + errc <- err + return + } + }() select { case <-c: return nil + case err := <-errc: + return err case <-ctx.Done(): return ctx.Err() case <-cc.readerDone: @@ -9051,6 +9343,12 @@ func (t *http2Transport) logf(format string, args ...interface{}) { var http2noBody io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil)) +type http2missingBody struct{} + +func (http2missingBody) Close() error { return nil } + +func (http2missingBody) Read([]byte) (int, error) { return 0, io.ErrUnexpectedEOF } + func http2strSliceContains(ss []string, s string) bool { for _, v := range ss { if v == s { @@ -9097,87 +9395,6 @@ type http2errorReader struct{ err error } func (r http2errorReader) Read(p []byte) (int, error) { return 0, r.err } -// bodyWriterState encapsulates various state around the Transport's writing -// of the request body, particularly regarding doing delayed writes of the body -// when the request contains "Expect: 100-continue". -type http2bodyWriterState struct { - cs *http2clientStream - timer *time.Timer // if non-nil, we're doing a delayed write - fnonce *sync.Once // to call fn with - fn func() // the code to run in the goroutine, writing the body - resc chan error // result of fn's execution - delay time.Duration // how long we should delay a delayed write for -} - -func (t *http2Transport) getBodyWriterState(cs *http2clientStream, body io.Reader) (s http2bodyWriterState) { - s.cs = cs - if body == nil { - return - } - resc := make(chan error, 1) - s.resc = resc - s.fn = func() { - cs.cc.mu.Lock() - cs.startedWrite = true - cs.cc.mu.Unlock() - resc <- cs.writeRequestBody(body, cs.req.Body) - } - s.delay = t.expectContinueTimeout() - if s.delay == 0 || - !httpguts.HeaderValuesContainsToken( - cs.req.Header["Expect"], - "100-continue") { - return - } - s.fnonce = new(sync.Once) - - // Arm the timer with a very large duration, which we'll - // intentionally lower later. It has to be large now because - // we need a handle to it before writing the headers, but the - // s.delay value is defined to not start until after the - // request headers were written. - const hugeDuration = 365 * 24 * time.Hour - s.timer = time.AfterFunc(hugeDuration, func() { - s.fnonce.Do(s.fn) - }) - return -} - -func (s http2bodyWriterState) cancel() { - if s.timer != nil { - if s.timer.Stop() { - s.resc <- nil - } - } -} - -func (s http2bodyWriterState) on100() { - if s.timer == nil { - // If we didn't do a delayed write, ignore the server's - // bogus 100 continue response. - return - } - s.timer.Stop() - go func() { s.fnonce.Do(s.fn) }() -} - -// scheduleBodyWrite starts writing the body, either immediately (in -// the common case) or after the delay timeout. It should not be -// called until after the headers have been written. -func (s http2bodyWriterState) scheduleBodyWrite() { - if s.timer == nil { - // We're not doing a delayed write (see - // getBodyWriterState), so just start the writing - // goroutine immediately. - go s.fn() - return - } - http2traceWait100Continue(s.cs.trace) - if s.timer.Stop() { - s.timer.Reset(s.delay) - } -} - // isConnectionCloseRequest reports whether req should use its own // connection for a single request and then close the connection. func http2isConnectionCloseRequest(req *Request) bool { @@ -9600,7 +9817,12 @@ func http2encodeHeaders(enc *hpack.Encoder, h Header, keys []string) { } for _, k := range keys { vv := h[k] - k = http2lowerHeader(k) + k, ascii := http2lowerHeader(k) + if !ascii { + // Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header + // field names have to be ASCII characters (just as in HTTP/1.x). + continue + } if !http2validWireHeaderFieldName(k) { // Skip it as backup paranoia. Per // golang.org/issue/14048, these should |