aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDmitri Shuralyov <dmitshur@golang.org>2021-10-29 14:16:47 -0400
committerDmitri Shuralyov <dmitshur@golang.org>2021-11-01 21:27:46 +0000
commitf9cb33c7c9d48fd0c38d17a659d5e968de648d40 (patch)
treefc81b3e3e469a52d32c8c2a994a98eca200c6898
parent4a842985bf3f71d93a2b1340d9d6685bebc12b6b (diff)
downloadgo-f9cb33c7c9d48fd0c38d17a659d5e968de648d40.tar.gz
go-f9cb33c7c9d48fd0c38d17a659d5e968de648d40.zip
[release-branch.go1.17] net/http: update bundled golang.org/x/net/http2
Pull in approved backports to golang.org/x/net/http2: 95aca89 set ContentLength to -1 for HEAD response with no Content-Length bd5b1b8 set Response.ContentLength to 0 when headers end stream 27001ec don't abort half-closed streams on server connection close f0a8156 on write errors, close ClientConn before returning from RoundTrip 9a182eb deflake TestTransportReqBodyAfterResponse_200 821db7b close the Request's Body when aborting a stream 028e125 return unexpected eof on empty response with non-zero content length 5388f2f don't rely on system TCP buffer sizes in TestServer_MaxQueuedControlFrames fc298ce detect write-blocked PING frames e96ad84 avoid race in TestTransportReqBodyAfterResponse_403. 7f15435 avoid clientConnPool panic when NewClientConn fails 9572bae avoid extra GetConn trace call b04064c refactor request write flow 7e165c9 remove PingTimeout from TestTransportPingWhenReading ef976fc fix Transport connection pool TOCTOU max concurrent stream bug 1d9597c shut down idle Transport connections after protocol errors c173d09 remove check for read-after-close of request bodies 466a463 fix race in DATA frame padding refund 4028c5f avoid blocking while holding ClientConn.mu b91f72d fix off-by-one error in client check for max concurrent streams 21e6c63 close request body after early RoundTrip failures e79adf9 limit client initial MAX_CONCURRENT_STREAMS c0c2bc5 make Transport not reuse conns after a stream protocol error 14c0235 accept zero-length block fragments in HEADERS frames 0d2c43c close the request body if needed 5627bb0 reduce frameScratchBuffer caching aggressiveness c9f4fb0 also set "http/1.1" ALPN in ConfigureServer By doing: $ go get -d golang.org/x/net@internal-branch.go1.17-vendor go get: upgraded golang.org/x/net v0.0.0-20210901185426-6d2eada6345e => v0.0.0-20211101194204-95aca89e93de $ go mod tidy $ go mod vendor $ go generate -run=bundle std Fixes #49077. Fixes #48823. Fixes #48650. Change-Id: Idb972ba5313080626b60b4111d52b80197364ff4 Reviewed-on: https://go-review.googlesource.com/c/go/+/359776 Trust: Dmitri Shuralyov <dmitshur@golang.org> Run-TryBot: Dmitri Shuralyov <dmitshur@golang.org> Reviewed-by: Damien Neil <dneil@google.com> TryBot-Result: Go Bot <gobot@golang.org>
-rw-r--r--src/go.mod2
-rw-r--r--src/go.sum4
-rw-r--r--src/net/http/h2_bundle.go1438
-rw-r--r--src/vendor/modules.txt2
4 files changed, 781 insertions, 665 deletions
diff --git a/src/go.mod b/src/go.mod
index 72d13f1b562..386b51a6569 100644
--- a/src/go.mod
+++ b/src/go.mod
@@ -4,7 +4,7 @@ go 1.17
require (
golang.org/x/crypto v0.0.0-20210503195802-e9a32991a82e
- golang.org/x/net v0.0.0-20210901185426-6d2eada6345e
+ golang.org/x/net v0.0.0-20211101194204-95aca89e93de
)
require (
diff --git a/src/go.sum b/src/go.sum
index 09d5dddfedb..1f328206ecb 100644
--- a/src/go.sum
+++ b/src/go.sum
@@ -1,8 +1,8 @@
golang.org/x/crypto v0.0.0-20210503195802-e9a32991a82e h1:8foAy0aoO5GkqCvAEJ4VC4P3zksTg4X4aJCDpZzmgQI=
golang.org/x/crypto v0.0.0-20210503195802-e9a32991a82e/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
-golang.org/x/net v0.0.0-20210901185426-6d2eada6345e h1:50pLUXxddAhYigPZvsrMVrd+113EKnh8VVRKuSaRviw=
-golang.org/x/net v0.0.0-20210901185426-6d2eada6345e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20211101194204-95aca89e93de h1:dKoXPECQZ51dGVSkuiD9YzeNpLT4UPUY4d3xo0sWrkU=
+golang.org/x/net v0.0.0-20211101194204-95aca89e93de/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210511113859-b0526f3d8744 h1:yhBbb4IRs2HS9PPlAg6DMC6mUOKexJBNsLf4Z+6En1Q=
diff --git a/src/net/http/h2_bundle.go b/src/net/http/h2_bundle.go
index 3445cf550f3..9112079a224 100644
--- a/src/net/http/h2_bundle.go
+++ b/src/net/http/h2_bundle.go
@@ -733,6 +733,12 @@ func http2isBadCipher(cipher uint16) bool {
// ClientConnPool manages a pool of HTTP/2 client connections.
type http2ClientConnPool interface {
+ // GetClientConn returns a specific HTTP/2 connection (usually
+ // a TLS-TCP connection) to an HTTP/2 server. On success, the
+ // returned ClientConn accounts for the upcoming RoundTrip
+ // call, so the caller should not omit it. If the caller needs
+ // to, ClientConn.RoundTrip can be called with a bogus
+ // new(http.Request) to release the stream reservation.
GetClientConn(req *Request, addr string) (*http2ClientConn, error)
MarkDead(*http2ClientConn)
}
@@ -759,7 +765,7 @@ type http2clientConnPool struct {
conns map[string][]*http2ClientConn // key is host:port
dialing map[string]*http2dialCall // currently in-flight dials
keys map[*http2ClientConn][]string
- addConnCalls map[string]*http2addConnCall // in-flight addConnIfNeede calls
+ addConnCalls map[string]*http2addConnCall // in-flight addConnIfNeeded calls
}
func (p *http2clientConnPool) GetClientConn(req *Request, addr string) (*http2ClientConn, error) {
@@ -771,28 +777,8 @@ const (
http2noDialOnMiss = false
)
-// shouldTraceGetConn reports whether getClientConn should call any
-// ClientTrace.GetConn hook associated with the http.Request.
-//
-// This complexity is needed to avoid double calls of the GetConn hook
-// during the back-and-forth between net/http and x/net/http2 (when the
-// net/http.Transport is upgraded to also speak http2), as well as support
-// the case where x/net/http2 is being used directly.
-func (p *http2clientConnPool) shouldTraceGetConn(st http2clientConnIdleState) bool {
- // If our Transport wasn't made via ConfigureTransport, always
- // trace the GetConn hook if provided, because that means the
- // http2 package is being used directly and it's the one
- // dialing, as opposed to net/http.
- if _, ok := p.t.ConnPool.(http2noDialClientConnPool); !ok {
- return true
- }
- // Otherwise, only use the GetConn hook if this connection has
- // been used previously for other requests. For fresh
- // connections, the net/http package does the dialing.
- return !st.freshConn
-}
-
func (p *http2clientConnPool) getClientConn(req *Request, addr string, dialOnMiss bool) (*http2ClientConn, error) {
+ // TODO(dneil): Dial a new connection when t.DisableKeepAlives is set?
if http2isConnectionCloseRequest(req) && dialOnMiss {
// It gets its own connection.
http2traceGetConn(req, addr)
@@ -806,10 +792,14 @@ func (p *http2clientConnPool) getClientConn(req *Request, addr string, dialOnMis
for {
p.mu.Lock()
for _, cc := range p.conns[addr] {
- if st := cc.idleState(); st.canTakeNewRequest {
- if p.shouldTraceGetConn(st) {
+ if cc.ReserveNewRequest() {
+ // When a connection is presented to us by the net/http package,
+ // the GetConn hook has already been called.
+ // Don't call it a second time here.
+ if !cc.getConnCalled {
http2traceGetConn(req, addr)
}
+ cc.getConnCalled = false
p.mu.Unlock()
return cc, nil
}
@@ -825,7 +815,13 @@ func (p *http2clientConnPool) getClientConn(req *Request, addr string, dialOnMis
if http2shouldRetryDial(call, req) {
continue
}
- return call.res, call.err
+ cc, err := call.res, call.err
+ if err != nil {
+ return nil, err
+ }
+ if cc.ReserveNewRequest() {
+ return cc, nil
+ }
}
}
@@ -922,6 +918,7 @@ func (c *http2addConnCall) run(t *http2Transport, key string, tc *tls.Conn) {
if err != nil {
c.err = err
} else {
+ cc.getConnCalled = true // already called by the net/http package
p.addConnLocked(key, cc)
}
delete(p.addConnCalls, key)
@@ -1224,6 +1221,11 @@ type http2StreamError struct {
Cause error // optional additional detail
}
+// errFromPeer is a sentinel error value for StreamError.Cause to
+// indicate that the StreamError was sent from the peer over the wire
+// and wasn't locally generated in the Transport.
+var http2errFromPeer = errors.New("received from peer")
+
func http2streamError(id uint32, code http2ErrCode) http2StreamError {
return http2StreamError{StreamID: id, Code: code}
}
@@ -2337,7 +2339,7 @@ func http2parseHeadersFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (
return nil, err
}
}
- if len(p)-int(padLength) <= 0 {
+ if len(p)-int(padLength) < 0 {
return nil, http2streamError(fh.StreamID, http2ErrCodeProtocol)
}
hf.headerFragBuf = p[:len(p)-int(padLength)]
@@ -3570,6 +3572,17 @@ type http2pipeBuffer interface {
io.Reader
}
+// setBuffer initializes the pipe buffer.
+// It has no effect if the pipe is already closed.
+func (p *http2pipe) setBuffer(b http2pipeBuffer) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if p.err != nil || p.breakErr != nil {
+ return
+ }
+ p.b = b
+}
+
func (p *http2pipe) Len() int {
p.mu.Lock()
defer p.mu.Unlock()
@@ -3915,16 +3928,12 @@ func http2ConfigureServer(s *Server, conf *http2Server) error {
s.TLSConfig.PreferServerCipherSuites = true
- haveNPN := false
- for _, p := range s.TLSConfig.NextProtos {
- if p == http2NextProtoTLS {
- haveNPN = true
- break
- }
- }
- if !haveNPN {
+ if !http2strSliceContains(s.TLSConfig.NextProtos, http2NextProtoTLS) {
s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, http2NextProtoTLS)
}
+ if !http2strSliceContains(s.TLSConfig.NextProtos, "http/1.1") {
+ s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, "http/1.1")
+ }
if s.TLSNextProto == nil {
s.TLSNextProto = map[string]func(*Server, *tls.Conn, Handler){}
@@ -6666,6 +6675,15 @@ const (
http2transportDefaultStreamMinRefresh = 4 << 10
http2defaultUserAgent = "Go-http-client/2.0"
+
+ // initialMaxConcurrentStreams is a connections maxConcurrentStreams until
+ // it's received servers initial SETTINGS frame, which corresponds with the
+ // spec's minimum recommended value.
+ http2initialMaxConcurrentStreams = 100
+
+ // defaultMaxConcurrentStreams is a connections default maxConcurrentStreams
+ // if the server doesn't include one in its initial SETTINGS frame.
+ http2defaultMaxConcurrentStreams = 1000
)
// Transport is an HTTP/2 Transport.
@@ -6842,11 +6860,12 @@ func (t *http2Transport) initConnPool() {
// ClientConn is the state of a single HTTP/2 client connection to an
// HTTP/2 server.
type http2ClientConn struct {
- t *http2Transport
- tconn net.Conn // usually *tls.Conn, except specialized impls
- tlsState *tls.ConnectionState // nil only for specialized impls
- reused uint32 // whether conn is being reused; atomic
- singleUse bool // whether being used for a single http.Request
+ t *http2Transport
+ tconn net.Conn // usually *tls.Conn, except specialized impls
+ tlsState *tls.ConnectionState // nil only for specialized impls
+ reused uint32 // whether conn is being reused; atomic
+ singleUse bool // whether being used for a single http.Request
+ getConnCalled bool // used by clientConnPool
// readLoop goroutine fields:
readerDone chan struct{} // closed on error
@@ -6859,87 +6878,94 @@ type http2ClientConn struct {
cond *sync.Cond // hold mu; broadcast on flow/closed changes
flow http2flow // our conn-level flow control quota (cs.flow is per stream)
inflow http2flow // peer's conn-level flow control
+ doNotReuse bool // whether conn is marked to not be reused for any future requests
closing bool
closed bool
+ seenSettings bool // true if we've seen a settings frame, false otherwise
wantSettingsAck bool // we sent a SETTINGS frame and haven't heard back
goAway *http2GoAwayFrame // if non-nil, the GoAwayFrame we received
goAwayDebug string // goAway frame's debug data, retained as a string
streams map[uint32]*http2clientStream // client-initiated
+ streamsReserved int // incr by ReserveNewRequest; decr on RoundTrip
nextStreamID uint32
pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
pings map[[8]byte]chan struct{} // in flight ping data to notification channel
- bw *bufio.Writer
br *bufio.Reader
- fr *http2Framer
lastActive time.Time
lastIdle time.Time // time last idle
- // Settings from peer: (also guarded by mu)
+ // Settings from peer: (also guarded by wmu)
maxFrameSize uint32
maxConcurrentStreams uint32
peerMaxHeaderListSize uint64
initialWindowSize uint32
- hbuf bytes.Buffer // HPACK encoder writes into this
- henc *hpack.Encoder
- freeBuf [][]byte
+ // reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests.
+ // Write to reqHeaderMu to lock it, read from it to unlock.
+ // Lock reqmu BEFORE mu or wmu.
+ reqHeaderMu chan struct{}
- wmu sync.Mutex // held while writing; acquire AFTER mu if holding both
- werr error // first write error that has occurred
+ // wmu is held while writing.
+ // Acquire BEFORE mu when holding both, to avoid blocking mu on network writes.
+ // Only acquire both at the same time when changing peer settings.
+ wmu sync.Mutex
+ bw *bufio.Writer
+ fr *http2Framer
+ werr error // first write error that has occurred
+ hbuf bytes.Buffer // HPACK encoder writes into this
+ henc *hpack.Encoder
}
// clientStream is the state for a single HTTP/2 stream. One of these
// is created for each Transport.RoundTrip call.
type http2clientStream struct {
- cc *http2ClientConn
- req *Request
+ cc *http2ClientConn
+
+ // Fields of Request that we may access even after the response body is closed.
+ ctx context.Context
+ reqCancel <-chan struct{}
+
trace *httptrace.ClientTrace // or nil
ID uint32
- resc chan http2resAndError
bufPipe http2pipe // buffered pipe with the flow-controlled response payload
- startedWrite bool // started request body write; guarded by cc.mu
requestedGzip bool
- on100 func() // optional code to run if get a 100 continue response
+ isHead bool
+
+ abortOnce sync.Once
+ abort chan struct{} // closed to signal stream should end immediately
+ abortErr error // set if abort is closed
+
+ peerClosed chan struct{} // closed when the peer sends an END_STREAM flag
+ donec chan struct{} // closed after the stream is in the closed state
+ on100 chan struct{} // buffered; written to if a 100 is received
+
+ respHeaderRecv chan struct{} // closed when headers are received
+ res *Response // set if respHeaderRecv is closed
flow http2flow // guarded by cc.mu
inflow http2flow // guarded by cc.mu
bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
readErr error // sticky read error; owned by transportResponseBody.Read
- stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu
- didReset bool // whether we sent a RST_STREAM to the server; guarded by cc.mu
- peerReset chan struct{} // closed on peer reset
- resetErr error // populated before peerReset is closed
+ reqBody io.ReadCloser
+ reqBodyContentLength int64 // -1 means unknown
+ reqBodyClosed bool // body has been closed; guarded by cc.mu
- done chan struct{} // closed when stream remove from cc.streams map; close calls guarded by cc.mu
+ // owned by writeRequest:
+ sentEndStream bool // sent an END_STREAM flag to the peer
+ sentHeaders bool
// owned by clientConnReadLoop:
firstByte bool // got the first response byte
pastHeaders bool // got first MetaHeadersFrame (actual headers)
pastTrailers bool // got optional second MetaHeadersFrame (trailers)
num1xx uint8 // number of 1xx responses seen
+ readClosed bool // peer sent an END_STREAM flag
+ readAborted bool // read loop reset the stream
trailer Header // accumulated trailers
resTrailer *Header // client's Response.Trailer
}
-// awaitRequestCancel waits for the user to cancel a request or for the done
-// channel to be signaled. A non-nil error is returned only if the request was
-// canceled.
-func http2awaitRequestCancel(req *Request, done <-chan struct{}) error {
- ctx := req.Context()
- if req.Cancel == nil && ctx.Done() == nil {
- return nil
- }
- select {
- case <-req.Cancel:
- return http2errRequestCanceled
- case <-ctx.Done():
- return ctx.Err()
- case <-done:
- return nil
- }
-}
-
var http2got1xxFuncForTests func(int, textproto.MIMEHeader) error
// get1xxTraceFunc returns the value of request's httptrace.ClientTrace.Got1xxResponse func,
@@ -6951,59 +6977,37 @@ func (cs *http2clientStream) get1xxTraceFunc() func(int, textproto.MIMEHeader) e
return http2traceGot1xxResponseFunc(cs.trace)
}
-// awaitRequestCancel waits for the user to cancel a request, its context to
-// expire, or for the request to be done (any way it might be removed from the
-// cc.streams map: peer reset, successful completion, TCP connection breakage,
-// etc). If the request is canceled, then cs will be canceled and closed.
-func (cs *http2clientStream) awaitRequestCancel(req *Request) {
- if err := http2awaitRequestCancel(req, cs.done); err != nil {
- cs.cancelStream()
- cs.bufPipe.CloseWithError(err)
- }
+func (cs *http2clientStream) abortStream(err error) {
+ cs.cc.mu.Lock()
+ defer cs.cc.mu.Unlock()
+ cs.abortStreamLocked(err)
}
-func (cs *http2clientStream) cancelStream() {
- cc := cs.cc
- cc.mu.Lock()
- didReset := cs.didReset
- cs.didReset = true
- cc.mu.Unlock()
-
- if !didReset {
- cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
- cc.forgetStreamID(cs.ID)
+func (cs *http2clientStream) abortStreamLocked(err error) {
+ cs.abortOnce.Do(func() {
+ cs.abortErr = err
+ close(cs.abort)
+ })
+ if cs.reqBody != nil && !cs.reqBodyClosed {
+ cs.reqBody.Close()
+ cs.reqBodyClosed = true
}
-}
-
-// checkResetOrDone reports any error sent in a RST_STREAM frame by the
-// server, or errStreamClosed if the stream is complete.
-func (cs *http2clientStream) checkResetOrDone() error {
- select {
- case <-cs.peerReset:
- return cs.resetErr
- case <-cs.done:
- return http2errStreamClosed
- default:
- return nil
+ // TODO(dneil): Clean up tests where cs.cc.cond is nil.
+ if cs.cc.cond != nil {
+ // Wake up writeRequestBody if it is waiting on flow control.
+ cs.cc.cond.Broadcast()
}
}
-func (cs *http2clientStream) getStartedWrite() bool {
+func (cs *http2clientStream) abortRequestBodyWrite() {
cc := cs.cc
cc.mu.Lock()
defer cc.mu.Unlock()
- return cs.startedWrite
-}
-
-func (cs *http2clientStream) abortRequestBodyWrite(err error) {
- if err == nil {
- panic("nil error")
+ if cs.reqBody != nil && !cs.reqBodyClosed {
+ cs.reqBody.Close()
+ cs.reqBodyClosed = true
+ cc.cond.Broadcast()
}
- cc := cs.cc
- cc.mu.Lock()
- cs.stopReqBody = err
- cc.cond.Broadcast()
- cc.mu.Unlock()
}
type http2stickyErrWriter struct {
@@ -7091,9 +7095,9 @@ func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Res
}
reused := !atomic.CompareAndSwapUint32(&cc.reused, 0, 1)
http2traceGotConn(req, cc, reused)
- res, gotErrAfterReqBodyWrite, err := cc.roundTrip(req)
+ res, err := cc.RoundTrip(req)
if err != nil && retry <= 6 {
- if req, err = http2shouldRetryRequest(req, err, gotErrAfterReqBodyWrite); err == nil {
+ if req, err = http2shouldRetryRequest(req, err); err == nil {
// After the first retry, do exponential backoff with 10% jitter.
if retry == 0 {
continue
@@ -7104,7 +7108,7 @@ func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Res
case <-time.After(time.Second * time.Duration(backoff)):
continue
case <-req.Context().Done():
- return nil, req.Context().Err()
+ err = req.Context().Err()
}
}
}
@@ -7135,7 +7139,7 @@ var (
// response headers. It is always called with a non-nil error.
// It returns either a request to retry (either the same request, or a
// modified clone), or an error if the request can't be replayed.
-func http2shouldRetryRequest(req *Request, err error, afterBodyWrite bool) (*Request, error) {
+func http2shouldRetryRequest(req *Request, err error) (*Request, error) {
if !http2canRetryError(err) {
return nil, err
}
@@ -7148,7 +7152,6 @@ func http2shouldRetryRequest(req *Request, err error, afterBodyWrite bool) (*Req
// If the request body can be reset back to its original
// state via the optional req.GetBody, do that.
if req.GetBody != nil {
- // TODO: consider a req.Body.Close here? or audit that all caller paths do?
body, err := req.GetBody()
if err != nil {
return nil, err
@@ -7160,10 +7163,8 @@ func http2shouldRetryRequest(req *Request, err error, afterBodyWrite bool) (*Req
// The Request.Body can't reset back to the beginning, but we
// don't seem to have started to read from it yet, so reuse
- // the request directly. The "afterBodyWrite" means the
- // bodyWrite process has started, which becomes true before
- // the first Read.
- if !afterBodyWrite {
+ // the request directly.
+ if err == http2errClientConnUnusable {
return req, nil
}
@@ -7175,6 +7176,10 @@ func http2canRetryError(err error) bool {
return true
}
if se, ok := err.(http2StreamError); ok {
+ if se.Code == http2ErrCodeProtocol && se.Cause == http2errFromPeer {
+ // See golang/go#47635, golang/go#42777
+ return true
+ }
return se.Code == http2ErrCodeRefusedStream
}
return false
@@ -7249,14 +7254,15 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2Client
tconn: c,
readerDone: make(chan struct{}),
nextStreamID: 1,
- maxFrameSize: 16 << 10, // spec default
- initialWindowSize: 65535, // spec default
- maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough.
- peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
+ maxFrameSize: 16 << 10, // spec default
+ initialWindowSize: 65535, // spec default
+ maxConcurrentStreams: http2initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings.
+ peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
streams: make(map[uint32]*http2clientStream),
singleUse: singleUse,
wantSettingsAck: true,
pings: make(map[[8]byte]chan struct{}),
+ reqHeaderMu: make(chan struct{}, 1),
}
if d := t.idleConnTimeout(); d != 0 {
cc.idleTimeout = d
@@ -7326,6 +7332,13 @@ func (cc *http2ClientConn) healthCheck() {
}
}
+// SetDoNotReuse marks cc as not reusable for future HTTP requests.
+func (cc *http2ClientConn) SetDoNotReuse() {
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+ cc.doNotReuse = true
+}
+
func (cc *http2ClientConn) setGoAway(f *http2GoAwayFrame) {
cc.mu.Lock()
defer cc.mu.Unlock()
@@ -7343,27 +7356,39 @@ func (cc *http2ClientConn) setGoAway(f *http2GoAwayFrame) {
last := f.LastStreamID
for streamID, cs := range cc.streams {
if streamID > last {
- select {
- case cs.resc <- http2resAndError{err: http2errClientConnGotGoAway}:
- default:
- }
+ cs.abortStreamLocked(http2errClientConnGotGoAway)
}
}
}
// CanTakeNewRequest reports whether the connection can take a new request,
// meaning it has not been closed or received or sent a GOAWAY.
+//
+// If the caller is going to immediately make a new request on this
+// connection, use ReserveNewRequest instead.
func (cc *http2ClientConn) CanTakeNewRequest() bool {
cc.mu.Lock()
defer cc.mu.Unlock()
return cc.canTakeNewRequestLocked()
}
+// ReserveNewRequest is like CanTakeNewRequest but also reserves a
+// concurrent stream in cc. The reservation is decremented on the
+// next call to RoundTrip.
+func (cc *http2ClientConn) ReserveNewRequest() bool {
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+ if st := cc.idleStateLocked(); !st.canTakeNewRequest {
+ return false
+ }
+ cc.streamsReserved++
+ return true
+}
+
// clientConnIdleState describes the suitability of a client
// connection to initiate a new RoundTrip request.
type http2clientConnIdleState struct {
canTakeNewRequest bool
- freshConn bool // whether it's unused by any previous request
}
func (cc *http2ClientConn) idleState() http2clientConnIdleState {
@@ -7384,13 +7409,13 @@ func (cc *http2ClientConn) idleStateLocked() (st http2clientConnIdleState) {
// writing it.
maxConcurrentOkay = true
} else {
- maxConcurrentOkay = int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams)
+ maxConcurrentOkay = int64(len(cc.streams)+cc.streamsReserved+1) <= int64(cc.maxConcurrentStreams)
}
st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&
+ !cc.doNotReuse &&
int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
!cc.tooIdleLocked()
- st.freshConn = cc.nextStreamID == 1 && st.canTakeNewRequest
return
}
@@ -7421,7 +7446,7 @@ func (cc *http2ClientConn) onIdleTimeout() {
func (cc *http2ClientConn) closeIfIdle() {
cc.mu.Lock()
- if len(cc.streams) > 0 {
+ if len(cc.streams) > 0 || cc.streamsReserved > 0 {
cc.mu.Unlock()
return
}
@@ -7436,9 +7461,15 @@ func (cc *http2ClientConn) closeIfIdle() {
cc.tconn.Close()
}
+func (cc *http2ClientConn) isDoNotReuseAndIdle() bool {
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+ return cc.doNotReuse && len(cc.streams) == 0
+}
+
var http2shutdownEnterWaitStateHook = func() {}
-// Shutdown gracefully close the client connection, waiting for running streams to complete.
+// Shutdown gracefully closes the client connection, waiting for running streams to complete.
func (cc *http2ClientConn) Shutdown(ctx context.Context) error {
if err := cc.sendGoAway(); err != nil {
return err
@@ -7477,15 +7508,18 @@ func (cc *http2ClientConn) Shutdown(ctx context.Context) error {
func (cc *http2ClientConn) sendGoAway() error {
cc.mu.Lock()
- defer cc.mu.Unlock()
- cc.wmu.Lock()
- defer cc.wmu.Unlock()
- if cc.closing {
+ closing := cc.closing
+ cc.closing = true
+ maxStreamID := cc.nextStreamID
+ cc.mu.Unlock()
+ if closing {
// GOAWAY sent already
return nil
}
+
+ cc.wmu.Lock()
+ defer cc.wmu.Unlock()
// Send a graceful shutdown frame to server
- maxStreamID := cc.nextStreamID
if err := cc.fr.WriteGoAway(maxStreamID, http2ErrCodeNo, nil); err != nil {
return err
}
@@ -7493,7 +7527,6 @@ func (cc *http2ClientConn) sendGoAway() error {
return err
}
// Prevent new requests
- cc.closing = true
return nil
}
@@ -7501,17 +7534,12 @@ func (cc *http2ClientConn) sendGoAway() error {
// err is sent to streams.
func (cc *http2ClientConn) closeForError(err error) error {
cc.mu.Lock()
+ cc.closed = true
+ for _, cs := range cc.streams {
+ cs.abortStreamLocked(err)
+ }
defer cc.cond.Broadcast()
defer cc.mu.Unlock()
- for id, cs := range cc.streams {
- select {
- case cs.resc <- http2resAndError{err: err}:
- default:
- }
- cs.bufPipe.CloseWithError(err)
- delete(cc.streams, id)
- }
- cc.closed = true
return cc.tconn.Close()
}
@@ -7529,46 +7557,6 @@ func (cc *http2ClientConn) closeForLostPing() error {
return cc.closeForError(err)
}
-const http2maxAllocFrameSize = 512 << 10
-
-// frameBuffer returns a scratch buffer suitable for writing DATA frames.
-// They're capped at the min of the peer's max frame size or 512KB
-// (kinda arbitrarily), but definitely capped so we don't allocate 4GB
-// bufers.
-func (cc *http2ClientConn) frameScratchBuffer() []byte {
- cc.mu.Lock()
- size := cc.maxFrameSize
- if size > http2maxAllocFrameSize {
- size = http2maxAllocFrameSize
- }
- for i, buf := range cc.freeBuf {
- if len(buf) >= int(size) {
- cc.freeBuf[i] = nil
- cc.mu.Unlock()
- return buf[:size]
- }
- }
- cc.mu.Unlock()
- return make([]byte, size)
-}
-
-func (cc *http2ClientConn) putFrameScratchBuffer(buf []byte) {
- cc.mu.Lock()
- defer cc.mu.Unlock()
- const maxBufs = 4 // arbitrary; 4 concurrent requests per conn? investigate.
- if len(cc.freeBuf) < maxBufs {
- cc.freeBuf = append(cc.freeBuf, buf)
- return
- }
- for i, old := range cc.freeBuf {
- if old == nil {
- cc.freeBuf[i] = buf
- return
- }
- }
- // forget about it.
-}
-
// errRequestCanceled is a copy of net/http's errRequestCanceled because it's not
// exported. At least they'll be DeepEqual for h1-vs-h2 comparisons tests.
var http2errRequestCanceled = errors.New("net/http: request canceled")
@@ -7630,41 +7618,142 @@ func http2actualContentLength(req *Request) int64 {
return -1
}
+func (cc *http2ClientConn) decrStreamReservations() {
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+ cc.decrStreamReservationsLocked()
+}
+
+func (cc *http2ClientConn) decrStreamReservationsLocked() {
+ if cc.streamsReserved > 0 {
+ cc.streamsReserved--
+ }
+}
+
func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
- resp, _, err := cc.roundTrip(req)
- return resp, err
+ ctx := req.Context()
+ cs := &http2clientStream{
+ cc: cc,
+ ctx: ctx,
+ reqCancel: req.Cancel,
+ isHead: req.Method == "HEAD",
+ reqBody: req.Body,
+ reqBodyContentLength: http2actualContentLength(req),
+ trace: httptrace.ContextClientTrace(ctx),
+ peerClosed: make(chan struct{}),
+ abort: make(chan struct{}),
+ respHeaderRecv: make(chan struct{}),
+ donec: make(chan struct{}),
+ }
+ go cs.doRequest(req)
+
+ waitDone := func() error {
+ select {
+ case <-cs.donec:
+ return nil
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-cs.reqCancel:
+ return http2errRequestCanceled
+ }
+ }
+
+ for {
+ select {
+ case <-cs.respHeaderRecv:
+ res := cs.res
+ if res.StatusCode > 299 {
+ // On error or status code 3xx, 4xx, 5xx, etc abort any
+ // ongoing write, assuming that the server doesn't care
+ // about our request body. If the server replied with 1xx or
+ // 2xx, however, then assume the server DOES potentially
+ // want our body (e.g. full-duplex streaming:
+ // golang.org/issue/13444). If it turns out the server
+ // doesn't, they'll RST_STREAM us soon enough. This is a
+ // heuristic to avoid adding knobs to Transport. Hopefully
+ // we can keep it.
+ cs.abortRequestBodyWrite()
+ }
+ res.Request = req
+ res.TLS = cc.tlsState
+ if res.Body == http2noBody && http2actualContentLength(req) == 0 {
+ // If there isn't a request or response body still being
+ // written, then wait for the stream to be closed before
+ // RoundTrip returns.
+ if err := waitDone(); err != nil {
+ return nil, err
+ }
+ }
+ return res, nil
+ case <-cs.abort:
+ waitDone()
+ return nil, cs.abortErr
+ case <-ctx.Done():
+ err := ctx.Err()
+ cs.abortStream(err)
+ return nil, err
+ case <-cs.reqCancel:
+ cs.abortStream(http2errRequestCanceled)
+ return nil, http2errRequestCanceled
+ }
+ }
+}
+
+// writeRequest runs for the duration of the request lifetime.
+//
+// It sends the request and performs post-request cleanup (closing Request.Body, etc.).
+func (cs *http2clientStream) doRequest(req *Request) {
+ err := cs.writeRequest(req)
+ cs.cleanupWriteRequest(err)
}
-func (cc *http2ClientConn) roundTrip(req *Request) (res *Response, gotErrAfterReqBodyWrite bool, err error) {
+// writeRequest sends a request.
+//
+// It returns nil after the request is written, the response read,
+// and the request stream is half-closed by the peer.
+//
+// It returns non-nil if the request ends otherwise.
+// If the returned error is StreamError, the error Code may be used in resetting the stream.
+func (cs *http2clientStream) writeRequest(req *Request) (err error) {
+ cc := cs.cc
+ ctx := cs.ctx
+
if err := http2checkConnHeaders(req); err != nil {
- return nil, false, err
- }
- if cc.idleTimer != nil {
- cc.idleTimer.Stop()
+ return err
}
- trailers, err := http2commaSeparatedTrailers(req)
- if err != nil {
- return nil, false, err
+ // Acquire the new-request lock by writing to reqHeaderMu.
+ // This lock guards the critical section covering allocating a new stream ID
+ // (requires mu) and creating the stream (requires wmu).
+ if cc.reqHeaderMu == nil {
+ panic("RoundTrip on uninitialized ClientConn") // for tests
+ }
+ select {
+ case cc.reqHeaderMu <- struct{}{}:
+ case <-cs.reqCancel:
+ return http2errRequestCanceled
+ case <-ctx.Done():
+ return ctx.Err()
}
- hasTrailers := trailers != ""
cc.mu.Lock()
- if err := cc.awaitOpenSlotForRequest(req); err != nil {
+ if cc.idleTimer != nil {
+ cc.idleTimer.Stop()
+ }
+ cc.decrStreamReservationsLocked()
+ if err := cc.awaitOpenSlotForStreamLocked(cs); err != nil {
cc.mu.Unlock()
- return nil, false, err
+ <-cc.reqHeaderMu
+ return err
}
-
- body := req.Body
- contentLen := http2actualContentLength(req)
- hasBody := contentLen != 0
+ cc.addStreamLocked(cs) // assigns stream ID
+ cc.mu.Unlock()
// TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere?
- var requestedGzip bool
if !cc.t.disableCompression() &&
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
- req.Method != "HEAD" {
+ !cs.isHead {
// Request gzip only, not deflate. Deflate is ambiguous and
// not as universally supported anyway.
// See: https://zlib.net/zlib_faq.html#faq39
@@ -7677,195 +7766,223 @@ func (cc *http2ClientConn) roundTrip(req *Request) (res *Response, gotErrAfterRe
// We don't request gzip if the request is for a range, since
// auto-decoding a portion of a gzipped document will just fail
// anyway. See https://golang.org/issue/8923
- requestedGzip = true
+ cs.requestedGzip = true
}
- // we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
- // sent by writeRequestBody below, along with any Trailers,
- // again in form HEADERS{1}, CONTINUATION{0,})
- hdrs, err := cc.encodeHeaders(req, requestedGzip, trailers, contentLen)
- if err != nil {
- cc.mu.Unlock()
- return nil, false, err
+ continueTimeout := cc.t.expectContinueTimeout()
+ if continueTimeout != 0 &&
+ !httpguts.HeaderValuesContainsToken(
+ req.Header["Expect"],
+ "100-continue") {
+ continueTimeout = 0
+ cs.on100 = make(chan struct{}, 1)
}
- cs := cc.newStream()
- cs.req = req
- cs.trace = httptrace.ContextClientTrace(req.Context())
- cs.requestedGzip = requestedGzip
- bodyWriter := cc.t.getBodyWriterState(cs, body)
- cs.on100 = bodyWriter.on100
+ // Past this point (where we send request headers), it is possible for
+ // RoundTrip to return successfully. Since the RoundTrip contract permits
+ // the caller to "mutate or reuse" the Request after closing the Response's Body,
+ // we must take care when referencing the Request from here on.
+ err = cs.encodeAndWriteHeaders(req)
+ <-cc.reqHeaderMu
+ if err != nil {
+ return err
+ }
- defer func() {
- cc.wmu.Lock()
- werr := cc.werr
- cc.wmu.Unlock()
- if werr != nil {
- cc.Close()
+ hasBody := cs.reqBodyContentLength != 0
+ if !hasBody {
+ cs.sentEndStream = true
+ } else {
+ if continueTimeout != 0 {
+ http2traceWait100Continue(cs.trace)
+ timer := time.NewTimer(continueTimeout)
+ select {
+ case <-timer.C:
+ err = nil
+ case <-cs.on100:
+ err = nil
+ case <-cs.abort:
+ err = cs.abortErr
+ case <-ctx.Done():
+ err = ctx.Err()
+ case <-cs.reqCancel:
+ err = http2errRequestCanceled
+ }
+ timer.Stop()
+ if err != nil {
+ http2traceWroteRequest(cs.trace, err)
+ return err
+ }
}
- }()
- cc.wmu.Lock()
- endStream := !hasBody && !hasTrailers
- werr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
- cc.wmu.Unlock()
- http2traceWroteHeaders(cs.trace)
- cc.mu.Unlock()
-
- if werr != nil {
- if hasBody {
- req.Body.Close() // per RoundTripper contract
- bodyWriter.cancel()
+ if err = cs.writeRequestBody(req); err != nil {
+ if err != http2errStopReqBodyWrite {
+ http2traceWroteRequest(cs.trace, err)
+ return err
+ }
+ } else {
+ cs.sentEndStream = true
}
- cc.forgetStreamID(cs.ID)
- // Don't bother sending a RST_STREAM (our write already failed;
- // no need to keep writing)
- http2traceWroteRequest(cs.trace, werr)
- return nil, false, werr
}
+ http2traceWroteRequest(cs.trace, err)
+
var respHeaderTimer <-chan time.Time
- if hasBody {
- bodyWriter.scheduleBodyWrite()
- } else {
- http2traceWroteRequest(cs.trace, nil)
- if d := cc.responseHeaderTimeout(); d != 0 {
- timer := time.NewTimer(d)
- defer timer.Stop()
- respHeaderTimer = timer.C
+ var respHeaderRecv chan struct{}
+ if d := cc.responseHeaderTimeout(); d != 0 {
+ timer := time.NewTimer(d)
+ defer timer.Stop()
+ respHeaderTimer = timer.C
+ respHeaderRecv = cs.respHeaderRecv
+ }
+ // Wait until the peer half-closes its end of the stream,
+ // or until the request is aborted (via context, error, or otherwise),
+ // whichever comes first.
+ for {
+ select {
+ case <-cs.peerClosed:
+ return nil
+ case <-respHeaderTimer:
+ return http2errTimeout
+ case <-respHeaderRecv:
+ respHeaderTimer = nil // keep waiting for END_STREAM
+ case <-cs.abort:
+ return cs.abortErr
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-cs.reqCancel:
+ return http2errRequestCanceled
}
}
+}
- readLoopResCh := cs.resc
- bodyWritten := false
- ctx := req.Context()
+func (cs *http2clientStream) encodeAndWriteHeaders(req *Request) error {
+ cc := cs.cc
+ ctx := cs.ctx
- handleReadLoopResponse := func(re http2resAndError) (*Response, bool, error) {
- res := re.res
- if re.err != nil || res.StatusCode > 299 {
- // On error or status code 3xx, 4xx, 5xx, etc abort any
- // ongoing write, assuming that the server doesn't care
- // about our request body. If the server replied with 1xx or
- // 2xx, however, then assume the server DOES potentially
- // want our body (e.g. full-duplex streaming:
- // golang.org/issue/13444). If it turns out the server
- // doesn't, they'll RST_STREAM us soon enough. This is a
- // heuristic to avoid adding knobs to Transport. Hopefully
- // we can keep it.
- bodyWriter.cancel()
- cs.abortRequestBodyWrite(http2errStopReqBodyWrite)
- if hasBody && !bodyWritten {
- <-bodyWriter.resc
- }
- }
- if re.err != nil {
- cc.forgetStreamID(cs.ID)
- return nil, cs.getStartedWrite(), re.err
- }
- res.Request = req
- res.TLS = cc.tlsState
- return res, false, nil
+ cc.wmu.Lock()
+ defer cc.wmu.Unlock()
+
+ // If the request was canceled while waiting for cc.mu, just quit.
+ select {
+ case <-cs.abort:
+ return cs.abortErr
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-cs.reqCancel:
+ return http2errRequestCanceled
+ default:
}
- for {
+ // Encode headers.
+ //
+ // we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
+ // sent by writeRequestBody below, along with any Trailers,
+ // again in form HEADERS{1}, CONTINUATION{0,})
+ trailers, err := http2commaSeparatedTrailers(req)
+ if err != nil {
+ return err
+ }
+ hasTrailers := trailers != ""
+ contentLen := http2actualContentLength(req)
+ hasBody := contentLen != 0
+ hdrs, err := cc.encodeHeaders(req, cs.requestedGzip, trailers, contentLen)
+ if err != nil {
+ return err
+ }
+
+ // Write the request.
+ endStream := !hasBody && !hasTrailers
+ cs.sentHeaders = true
+ err = cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
+ http2traceWroteHeaders(cs.trace)
+ return err
+}
+
+// cleanupWriteRequest performs post-request tasks.
+//
+// If err (the result of writeRequest) is non-nil and the stream is not closed,
+// cleanupWriteRequest will send a reset to the peer.
+func (cs *http2clientStream) cleanupWriteRequest(err error) {
+ cc := cs.cc
+
+ if cs.ID == 0 {
+ // We were canceled before creating the stream, so return our reservation.
+ cc.decrStreamReservations()
+ }
+
+ // TODO: write h12Compare test showing whether
+ // Request.Body is closed by the Transport,
+ // and in multiple cases: server replies <=299 and >299
+ // while still writing request body
+ cc.mu.Lock()
+ bodyClosed := cs.reqBodyClosed
+ cs.reqBodyClosed = true
+ cc.mu.Unlock()
+ if !bodyClosed && cs.reqBody != nil {
+ cs.reqBody.Close()
+ }
+
+ if err != nil && cs.sentEndStream {
+ // If the connection is closed immediately after the response is read,
+ // we may be aborted before finishing up here. If the stream was closed
+ // cleanly on both sides, there is no error.
select {
- case re := <-readLoopResCh:
- return handleReadLoopResponse(re)
- case <-respHeaderTimer:
- if !hasBody || bodyWritten {
- cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
- } else {
- bodyWriter.cancel()
- cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel)
- <-bodyWriter.resc
- }
- cc.forgetStreamID(cs.ID)
- return nil, cs.getStartedWrite(), http2errTimeout
- case <-ctx.Done():
- if !hasBody || bodyWritten {
- cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
- } else {
- bodyWriter.cancel()
- cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel)
- <-bodyWriter.resc
- }
- cc.forgetStreamID(cs.ID)
- return nil, cs.getStartedWrite(), ctx.Err()
- case <-req.Cancel:
- if !hasBody || bodyWritten {
- cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
+ case <-cs.peerClosed:
+ err = nil
+ default:
+ }
+ }
+ if err != nil {
+ cs.abortStream(err) // possibly redundant, but harmless
+ if cs.sentHeaders {
+ if se, ok := err.(http2StreamError); ok {
+ if se.Cause != http2errFromPeer {
+ cc.writeStreamReset(cs.ID, se.Code, err)
+ }
} else {
- bodyWriter.cancel()
- cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel)
- <-bodyWriter.resc
- }
- cc.forgetStreamID(cs.ID)
- return nil, cs.getStartedWrite(), http2errRequestCanceled
- case <-cs.peerReset:
- // processResetStream already removed the
- // stream from the streams map; no need for
- // forgetStreamID.
- return nil, cs.getStartedWrite(), cs.resetErr
- case err := <-bodyWriter.resc:
- bodyWritten = true
- // Prefer the read loop's response, if available. Issue 16102.
- select {
- case re := <-readLoopResCh:
- return handleReadLoopResponse(re)
- default:
- }
- if err != nil {
- cc.forgetStreamID(cs.ID)
- return nil, cs.getStartedWrite(), err
- }
- if d := cc.responseHeaderTimeout(); d != 0 {
- timer := time.NewTimer(d)
- defer timer.Stop()
- respHeaderTimer = timer.C
+ cc.writeStreamReset(cs.ID, http2ErrCodeCancel, err)
}
}
+ cs.bufPipe.CloseWithError(err) // no-op if already closed
+ } else {
+ if cs.sentHeaders && !cs.sentEndStream {
+ cc.writeStreamReset(cs.ID, http2ErrCodeNo, nil)
+ }
+ cs.bufPipe.CloseWithError(http2errRequestCanceled)
+ }
+ if cs.ID != 0 {
+ cc.forgetStreamID(cs.ID)
+ }
+
+ cc.wmu.Lock()
+ werr := cc.werr
+ cc.wmu.Unlock()
+ if werr != nil {
+ cc.Close()
}
+
+ close(cs.donec)
}
-// awaitOpenSlotForRequest waits until len(streams) < maxConcurrentStreams.
+// awaitOpenSlotForStream waits until len(streams) < maxConcurrentStreams.
// Must hold cc.mu.
-func (cc *http2ClientConn) awaitOpenSlotForRequest(req *Request) error {
- var waitingForConn chan struct{}
- var waitingForConnErr error // guarded by cc.mu
+func (cc *http2ClientConn) awaitOpenSlotForStreamLocked(cs *http2clientStream) error {
for {
cc.lastActive = time.Now()
if cc.closed || !cc.canTakeNewRequestLocked() {
- if waitingForConn != nil {
- close(waitingForConn)
- }
return http2errClientConnUnusable
}
cc.lastIdle = time.Time{}
- if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) {
- if waitingForConn != nil {
- close(waitingForConn)
- }
+ if int64(len(cc.streams)) < int64(cc.maxConcurrentStreams) {
return nil
}
- // Unfortunately, we cannot wait on a condition variable and channel at
- // the same time, so instead, we spin up a goroutine to check if the
- // request is canceled while we wait for a slot to open in the connection.
- if waitingForConn == nil {
- waitingForConn = make(chan struct{})
- go func() {
- if err := http2awaitRequestCancel(req, waitingForConn); err != nil {
- cc.mu.Lock()
- waitingForConnErr = err
- cc.cond.Broadcast()
- cc.mu.Unlock()
- }
- }()
- }
cc.pendingRequests++
cc.cond.Wait()
cc.pendingRequests--
- if waitingForConnErr != nil {
- return waitingForConnErr
+ select {
+ case <-cs.abort:
+ return cs.abortErr
+ default:
}
}
}
@@ -7892,10 +8009,6 @@ func (cc *http2ClientConn) writeHeaders(streamID uint32, endStream bool, maxFram
cc.fr.WriteContinuation(streamID, endHeaders, chunk)
}
}
- // TODO(bradfitz): this Flush could potentially block (as
- // could the WriteHeaders call(s) above), which means they
- // wouldn't respond to Request.Cancel being readable. That's
- // rare, but this should probably be in a goroutine.
cc.bw.Flush()
return cc.werr
}
@@ -7911,32 +8024,59 @@ var (
http2errReqBodyTooLong = errors.New("http2: request body larger than specified content length")
)
-func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) {
+// frameScratchBufferLen returns the length of a buffer to use for
+// outgoing request bodies to read/write to/from.
+//
+// It returns max(1, min(peer's advertised max frame size,
+// Request.ContentLength+1, 512KB)).
+func (cs *http2clientStream) frameScratchBufferLen(maxFrameSize int) int {
+ const max = 512 << 10
+ n := int64(maxFrameSize)
+ if n > max {
+ n = max
+ }
+ if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
+ // Add an extra byte past the declared content-length to
+ // give the caller's Request.Body io.Reader a chance to
+ // give us more bytes than they declared, so we can catch it
+ // early.
+ n = cl + 1
+ }
+ if n < 1 {
+ return 1
+ }
+ return int(n) // doesn't truncate; max is 512K
+}
+
+var http2bufPool sync.Pool // of *[]byte
+
+func (cs *http2clientStream) writeRequestBody(req *Request) (err error) {
cc := cs.cc
+ body := cs.reqBody
sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
- buf := cc.frameScratchBuffer()
- defer cc.putFrameScratchBuffer(buf)
- defer func() {
- http2traceWroteRequest(cs.trace, err)
- // TODO: write h12Compare test showing whether
- // Request.Body is closed by the Transport,
- // and in multiple cases: server replies <=299 and >299
- // while still writing request body
- cerr := bodyCloser.Close()
- if err == nil {
- err = cerr
- }
- }()
-
- req := cs.req
hasTrailers := req.Trailer != nil
- remainLen := http2actualContentLength(req)
+ remainLen := cs.reqBodyContentLength
hasContentLen := remainLen != -1
+ cc.mu.Lock()
+ maxFrameSize := int(cc.maxFrameSize)
+ cc.mu.Unlock()
+
+ // Scratch buffer for reading into & writing from.
+ scratchLen := cs.frameScratchBufferLen(maxFrameSize)
+ var buf []byte
+ if bp, ok := http2bufPool.Get().(*[]byte); ok && len(*bp) >= scratchLen {
+ defer http2bufPool.Put(bp)
+ buf = *bp
+ } else {
+ buf = make([]byte, scratchLen)
+ defer http2bufPool.Put(&buf)
+ }
+
var sawEOF bool
for !sawEOF {
- n, err := body.Read(buf[:len(buf)-1])
+ n, err := body.Read(buf[:len(buf)])
if hasContentLen {
remainLen -= int64(n)
if remainLen == 0 && err == nil {
@@ -7947,13 +8087,13 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Clos
// to send the END_STREAM bit early, double-check that we're actually
// at EOF. Subsequent reads should return (0, EOF) at this point.
// If either value is different, we return an error in one of two ways below.
+ var scratch [1]byte
var n1 int
- n1, err = body.Read(buf[n:])
+ n1, err = body.Read(scratch[:])
remainLen -= int64(n1)
}
if remainLen < 0 {
err = http2errReqBodyTooLong
- cc.writeStreamReset(cs.ID, http2ErrCodeCancel, err)
return err
}
}
@@ -7961,7 +8101,6 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Clos
sawEOF = true
err = nil
} else if err != nil {
- cc.writeStreamReset(cs.ID, http2ErrCodeCancel, err)
return err
}
@@ -7969,13 +8108,7 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Clos
for len(remain) > 0 && err == nil {
var allowed int32
allowed, err = cs.awaitFlowControl(len(remain))
- switch {
- case err == http2errStopReqBodyWrite:
- return err
- case err == http2errStopReqBodyWriteAndCancel:
- cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
- return err
- case err != nil:
+ if err != nil {
return err
}
cc.wmu.Lock()
@@ -8006,24 +8139,26 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Clos
return nil
}
- var trls []byte
- if hasTrailers {
- cc.mu.Lock()
- trls, err = cc.encodeTrailers(req)
- cc.mu.Unlock()
- if err != nil {
- cc.writeStreamReset(cs.ID, http2ErrCodeInternal, err)
- cc.forgetStreamID(cs.ID)
- return err
- }
- }
-
+ // Since the RoundTrip contract permits the caller to "mutate or reuse"
+ // a request after the Response's Body is closed, verify that this hasn't
+ // happened before accessing the trailers.
cc.mu.Lock()
- maxFrameSize := int(cc.maxFrameSize)
+ trailer := req.Trailer
+ err = cs.abortErr
cc.mu.Unlock()
+ if err != nil {
+ return err
+ }
cc.wmu.Lock()
defer cc.wmu.Unlock()
+ var trls []byte
+ if len(trailer) > 0 {
+ trls, err = cc.encodeTrailers(trailer)
+ if err != nil {
+ return err
+ }
+ }
// Two ways to send END_STREAM: either with trailers, or
// with an empty DATA frame.
@@ -8044,17 +8179,24 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Clos
// if the stream is dead.
func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
cc := cs.cc
+ ctx := cs.ctx
cc.mu.Lock()
defer cc.mu.Unlock()
for {
if cc.closed {
return 0, http2errClientConnClosed
}
- if cs.stopReqBody != nil {
- return 0, cs.stopReqBody
+ if cs.reqBodyClosed {
+ return 0, http2errStopReqBodyWrite
}
- if err := cs.checkResetOrDone(); err != nil {
- return 0, err
+ select {
+ case <-cs.abort:
+ return 0, cs.abortErr
+ case <-ctx.Done():
+ return 0, ctx.Err()
+ case <-cs.reqCancel:
+ return 0, http2errRequestCanceled
+ default:
}
if a := cs.flow.available(); a > 0 {
take := a
@@ -8072,9 +8214,14 @@ func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err er
}
}
-// requires cc.mu be held.
+var http2errNilRequestURL = errors.New("http2: Request.URI is nil")
+
+// requires cc.wmu be held.
func (cc *http2ClientConn) encodeHeaders(req *Request, addGzipHeader bool, trailers string, contentLength int64) ([]byte, error) {
cc.hbuf.Reset()
+ if req.URL == nil {
+ return nil, http2errNilRequestURL
+ }
host := req.Host
if host == "" {
@@ -8260,12 +8407,12 @@ func http2shouldSendReqContentLength(method string, contentLength int64) bool {
}
}
-// requires cc.mu be held.
-func (cc *http2ClientConn) encodeTrailers(req *Request) ([]byte, error) {
+// requires cc.wmu be held.
+func (cc *http2ClientConn) encodeTrailers(trailer Header) ([]byte, error) {
cc.hbuf.Reset()
hlSize := uint64(0)
- for k, vv := range req.Trailer {
+ for k, vv := range trailer {
for _, v := range vv {
hf := hpack.HeaderField{Name: k, Value: v}
hlSize += uint64(hf.Size())
@@ -8275,7 +8422,7 @@ func (cc *http2ClientConn) encodeTrailers(req *Request) ([]byte, error) {
return nil, http2errRequestHeaderListSize
}
- for k, vv := range req.Trailer {
+ for k, vv := range trailer {
lowKey, ascii := http2asciiToLower(k)
if !ascii {
// Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
@@ -8305,51 +8452,51 @@ type http2resAndError struct {
}
// requires cc.mu be held.
-func (cc *http2ClientConn) newStream() *http2clientStream {
- cs := &http2clientStream{
- cc: cc,
- ID: cc.nextStreamID,
- resc: make(chan http2resAndError, 1),
- peerReset: make(chan struct{}),
- done: make(chan struct{}),
- }
+func (cc *http2ClientConn) addStreamLocked(cs *http2clientStream) {
cs.flow.add(int32(cc.initialWindowSize))
cs.flow.setConnFlow(&cc.flow)
cs.inflow.add(http2transportDefaultStreamFlow)
cs.inflow.setConnFlow(&cc.inflow)
+ cs.ID = cc.nextStreamID
cc.nextStreamID += 2
cc.streams[cs.ID] = cs
- return cs
+ if cs.ID == 0 {
+ panic("assigned stream ID 0")
+ }
}
func (cc *http2ClientConn) forgetStreamID(id uint32) {
- cc.streamByID(id, true)
-}
-
-func (cc *http2ClientConn) streamByID(id uint32, andRemove bool) *http2clientStream {
cc.mu.Lock()
- defer cc.mu.Unlock()
- cs := cc.streams[id]
- if andRemove && cs != nil && !cc.closed {
- cc.lastActive = time.Now()
- delete(cc.streams, id)
- if len(cc.streams) == 0 && cc.idleTimer != nil {
- cc.idleTimer.Reset(cc.idleTimeout)
- cc.lastIdle = time.Now()
- }
- close(cs.done)
- // Wake up checkResetOrDone via clientStream.awaitFlowControl and
- // wake up RoundTrip if there is a pending request.
- cc.cond.Broadcast()
+ slen := len(cc.streams)
+ delete(cc.streams, id)
+ if len(cc.streams) != slen-1 {
+ panic("forgetting unknown stream id")
+ }
+ cc.lastActive = time.Now()
+ if len(cc.streams) == 0 && cc.idleTimer != nil {
+ cc.idleTimer.Reset(cc.idleTimeout)
+ cc.lastIdle = time.Now()
+ }
+ // Wake up writeRequestBody via clientStream.awaitFlowControl and
+ // wake up RoundTrip if there is a pending request.
+ cc.cond.Broadcast()
+
+ closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives()
+ if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 {
+ if http2VerboseLogs {
+ cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, cc.nextStreamID-2)
+ }
+ cc.closed = true
+ defer cc.tconn.Close()
}
- return cs
+
+ cc.mu.Unlock()
}
// clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop.
type http2clientConnReadLoop struct {
- _ http2incomparable
- cc *http2ClientConn
- closeWhenIdle bool
+ _ http2incomparable
+ cc *http2ClientConn
}
// readLoop runs in its own goroutine and reads and dispatches frames.
@@ -8409,23 +8556,22 @@ func (rl *http2clientConnReadLoop) cleanup() {
} else if err == io.EOF {
err = io.ErrUnexpectedEOF
}
+ cc.closed = true
for _, cs := range cc.streams {
- cs.bufPipe.CloseWithError(err) // no-op if already closed
select {
- case cs.resc <- http2resAndError{err: err}:
+ case <-cs.peerClosed:
+ // The server closed the stream before closing the conn,
+ // so no need to interrupt it.
default:
+ cs.abortStreamLocked(err)
}
- close(cs.done)
}
- cc.closed = true
cc.cond.Broadcast()
cc.mu.Unlock()
}
func (rl *http2clientConnReadLoop) run() error {
cc := rl.cc
- rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
- gotReply := false // ever saw a HEADERS reply
gotSettings := false
readIdleTimeout := cc.t.ReadIdleTimeout
var t *time.Timer
@@ -8442,9 +8588,7 @@ func (rl *http2clientConnReadLoop) run() error {
cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
}
if se, ok := err.(http2StreamError); ok {
- if cs := cc.streamByID(se.StreamID, false); cs != nil {
- cs.cc.writeStreamReset(cs.ID, se.Code, err)
- cs.cc.forgetStreamID(cs.ID)
+ if cs := rl.streamByID(se.StreamID); cs != nil {
if se.Cause == nil {
se.Cause = cc.fr.errDetail
}
@@ -8464,22 +8608,16 @@ func (rl *http2clientConnReadLoop) run() error {
}
gotSettings = true
}
- maybeIdle := false // whether frame might transition us to idle
switch f := f.(type) {
case *http2MetaHeadersFrame:
err = rl.processHeaders(f)
- maybeIdle = true
- gotReply = true
case *http2DataFrame:
err = rl.processData(f)
- maybeIdle = true
case *http2GoAwayFrame:
err = rl.processGoAway(f)
- maybeIdle = true
case *http2RSTStreamFrame:
err = rl.processResetStream(f)
- maybeIdle = true
case *http2SettingsFrame:
err = rl.processSettings(f)
case *http2PushPromiseFrame:
@@ -8497,38 +8635,24 @@ func (rl *http2clientConnReadLoop) run() error {
}
return err
}
- if rl.closeWhenIdle && gotReply && maybeIdle {
- cc.closeIfIdle()
- }
}
}
func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) error {
- cc := rl.cc
- cs := cc.streamByID(f.StreamID, false)
+ cs := rl.streamByID(f.StreamID)
if cs == nil {
// We'd get here if we canceled a request while the
// server had its response still in flight. So if this
// was just something we canceled, ignore it.
return nil
}
- if f.StreamEnded() {
- // Issue 20521: If the stream has ended, streamByID() causes
- // clientStream.done to be closed, which causes the request's bodyWriter
- // to be closed with an errStreamClosed, which may be received by
- // clientConn.RoundTrip before the result of processing these headers.
- // Deferring stream closure allows the header processing to occur first.
- // clientConn.RoundTrip may still receive the bodyWriter error first, but
- // the fix for issue 16102 prioritises any response.
- //
- // Issue 22413: If there is no request body, we should close the
- // stream before writing to cs.resc so that the stream is closed
- // immediately once RoundTrip returns.
- if cs.req.Body != nil {
- defer cc.forgetStreamID(f.StreamID)
- } else {
- cc.forgetStreamID(f.StreamID)
- }
+ if cs.readClosed {
+ rl.endStreamError(cs, http2StreamError{
+ StreamID: f.StreamID,
+ Code: http2ErrCodeProtocol,
+ Cause: errors.New("protocol error: headers after END_STREAM"),
+ })
+ return nil
}
if !cs.firstByte {
if cs.trace != nil {
@@ -8552,9 +8676,11 @@ func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) erro
return err
}
// Any other error type is a stream error.
- cs.cc.writeStreamReset(f.StreamID, http2ErrCodeProtocol, err)
- cc.forgetStreamID(cs.ID)
- cs.resc <- http2resAndError{err: err}
+ rl.endStreamError(cs, http2StreamError{
+ StreamID: f.StreamID,
+ Code: http2ErrCodeProtocol,
+ Cause: err,
+ })
return nil // return nil from process* funcs to keep conn alive
}
if res == nil {
@@ -8562,7 +8688,11 @@ func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) erro
return nil
}
cs.resTrailer = &res.Trailer
- cs.resc <- http2resAndError{res: res}
+ cs.res = res
+ close(cs.respHeaderRecv)
+ if f.StreamEnded() {
+ rl.endStream(cs)
+ }
return nil
}
@@ -8624,6 +8754,9 @@ func (rl *http2clientConnReadLoop) handleResponse(cs *http2clientStream, f *http
}
if statusCode >= 100 && statusCode <= 199 {
+ if f.StreamEnded() {
+ return nil, errors.New("1xx informational response with END_STREAM flag")
+ }
cs.num1xx++
const max1xxResponses = 5 // arbitrary bound on number of informational responses, same as net/http
if cs.num1xx > max1xxResponses {
@@ -8636,40 +8769,47 @@ func (rl *http2clientConnReadLoop) handleResponse(cs *http2clientStream, f *http
}
if statusCode == 100 {
http2traceGot100Continue(cs.trace)
- if cs.on100 != nil {
- cs.on100() // forces any write delay timer to fire
+ select {
+ case cs.on100 <- struct{}{}:
+ default:
}
}
cs.pastHeaders = false // do it all again
return nil, nil
}
- streamEnded := f.StreamEnded()
- isHead := cs.req.Method == "HEAD"
- if !streamEnded || isHead {
- res.ContentLength = -1
- if clens := res.Header["Content-Length"]; len(clens) == 1 {
- if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
- res.ContentLength = int64(cl)
- } else {
- // TODO: care? unlike http/1, it won't mess up our framing, so it's
- // more safe smuggling-wise to ignore.
- }
- } else if len(clens) > 1 {
+ res.ContentLength = -1
+ if clens := res.Header["Content-Length"]; len(clens) == 1 {
+ if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
+ res.ContentLength = int64(cl)
+ } else {
// TODO: care? unlike http/1, it won't mess up our framing, so it's
// more safe smuggling-wise to ignore.
}
+ } else if len(clens) > 1 {
+ // TODO: care? unlike http/1, it won't mess up our framing, so it's
+ // more safe smuggling-wise to ignore.
+ } else if f.StreamEnded() && !cs.isHead {
+ res.ContentLength = 0
}
- if streamEnded || isHead {
+ if cs.isHead {
res.Body = http2noBody
return res, nil
}
- cs.bufPipe = http2pipe{b: &http2dataBuffer{expected: res.ContentLength}}
+ if f.StreamEnded() {
+ if res.ContentLength > 0 {
+ res.Body = http2missingBody{}
+ } else {
+ res.Body = http2noBody
+ }
+ return res, nil
+ }
+
+ cs.bufPipe.setBuffer(&http2dataBuffer{expected: res.ContentLength})
cs.bytesRemain = res.ContentLength
res.Body = http2transportResponseBody{cs}
- go cs.awaitRequestCancel(cs.req)
if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" {
res.Header.Del("Content-Encoding")
@@ -8710,8 +8850,7 @@ func (rl *http2clientConnReadLoop) processTrailers(cs *http2clientStream, f *htt
}
// transportResponseBody is the concrete type of Transport.RoundTrip's
-// Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body.
-// On Close it sends RST_STREAM if EOF wasn't already seen.
+// Response.Body. It is an io.ReadCloser.
type http2transportResponseBody struct {
cs *http2clientStream
}
@@ -8729,7 +8868,7 @@ func (b http2transportResponseBody) Read(p []byte) (n int, err error) {
n = int(cs.bytesRemain)
if err == nil {
err = errors.New("net/http: server replied with more than declared Content-Length; truncated")
- cc.writeStreamReset(cs.ID, http2ErrCodeProtocol, err)
+ cs.abortStream(err)
}
cs.readErr = err
return int(cs.bytesRemain), err
@@ -8747,8 +8886,6 @@ func (b http2transportResponseBody) Read(p []byte) (n int, err error) {
}
cc.mu.Lock()
- defer cc.mu.Unlock()
-
var connAdd, streamAdd int32
// Check the conn-level first, before the stream-level.
if v := cc.inflow.available(); v < http2transportDefaultConnFlow/2 {
@@ -8765,6 +8902,8 @@ func (b http2transportResponseBody) Read(p []byte) (n int, err error) {
cs.inflow.add(streamAdd)
}
}
+ cc.mu.Unlock()
+
if connAdd != 0 || streamAdd != 0 {
cc.wmu.Lock()
defer cc.wmu.Unlock()
@@ -8785,34 +8924,42 @@ func (b http2transportResponseBody) Close() error {
cs := b.cs
cc := cs.cc
- serverSentStreamEnd := cs.bufPipe.Err() == io.EOF
unread := cs.bufPipe.Len()
-
- if unread > 0 || !serverSentStreamEnd {
+ if unread > 0 {
cc.mu.Lock()
- cc.wmu.Lock()
- if !serverSentStreamEnd {
- cc.fr.WriteRSTStream(cs.ID, http2ErrCodeCancel)
- cs.didReset = true
- }
// Return connection-level flow control.
if unread > 0 {
cc.inflow.add(int32(unread))
+ }
+ cc.mu.Unlock()
+
+ // TODO(dneil): Acquiring this mutex can block indefinitely.
+ // Move flow control return to a goroutine?
+ cc.wmu.Lock()
+ // Return connection-level flow control.
+ if unread > 0 {
cc.fr.WriteWindowUpdate(0, uint32(unread))
}
cc.bw.Flush()
cc.wmu.Unlock()
- cc.mu.Unlock()
}
cs.bufPipe.BreakWithError(http2errClosedResponseBody)
- cc.forgetStreamID(cs.ID)
+ cs.abortStream(http2errClosedResponseBody)
+
+ select {
+ case <-cs.donec:
+ case <-cs.ctx.Done():
+ return cs.ctx.Err()
+ case <-cs.reqCancel:
+ return http2errRequestCanceled
+ }
return nil
}
func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error {
cc := rl.cc
- cs := cc.streamByID(f.StreamID, f.StreamEnded())
+ cs := rl.streamByID(f.StreamID)
data := f.Data()
if cs == nil {
cc.mu.Lock()
@@ -8841,6 +8988,14 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error {
}
return nil
}
+ if cs.readClosed {
+ cc.logf("protocol error: received DATA after END_STREAM")
+ rl.endStreamError(cs, http2StreamError{
+ StreamID: f.StreamID,
+ Code: http2ErrCodeProtocol,
+ })
+ return nil
+ }
if !cs.firstByte {
cc.logf("protocol error: received DATA before a HEADERS frame")
rl.endStreamError(cs, http2StreamError{
@@ -8850,7 +9005,7 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error {
return nil
}
if f.Length > 0 {
- if cs.req.Method == "HEAD" && len(data) > 0 {
+ if cs.isHead && len(data) > 0 {
cc.logf("protocol error: received DATA on a HEAD request")
rl.endStreamError(cs, http2StreamError{
StreamID: f.StreamID,
@@ -8872,30 +9027,39 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error {
if pad := int(f.Length) - len(data); pad > 0 {
refund += pad
}
- // Return len(data) now if the stream is already closed,
- // since data will never be read.
- didReset := cs.didReset
- if didReset {
- refund += len(data)
+
+ didReset := false
+ var err error
+ if len(data) > 0 {
+ if _, err = cs.bufPipe.Write(data); err != nil {
+ // Return len(data) now if the stream is already closed,
+ // since data will never be read.
+ didReset = true
+ refund += len(data)
+ }
}
+
if refund > 0 {
cc.inflow.add(int32(refund))
+ if !didReset {
+ cs.inflow.add(int32(refund))
+ }
+ }
+ cc.mu.Unlock()
+
+ if refund > 0 {
cc.wmu.Lock()
cc.fr.WriteWindowUpdate(0, uint32(refund))
if !didReset {
- cs.inflow.add(int32(refund))
cc.fr.WriteWindowUpdate(cs.ID, uint32(refund))
}
cc.bw.Flush()
cc.wmu.Unlock()
}
- cc.mu.Unlock()
- if len(data) > 0 && !didReset {
- if _, err := cs.bufPipe.Write(data); err != nil {
- rl.endStreamError(cs, err)
- return err
- }
+ if err != nil {
+ rl.endStreamError(cs, err)
+ return nil
}
}
@@ -8908,24 +9072,26 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error {
func (rl *http2clientConnReadLoop) endStream(cs *http2clientStream) {
// TODO: check that any declared content-length matches, like
// server.go's (*stream).endStream method.
- rl.endStreamError(cs, nil)
+ if !cs.readClosed {
+ cs.readClosed = true
+ cs.bufPipe.closeWithErrorAndCode(io.EOF, cs.copyTrailers)
+ close(cs.peerClosed)
+ }
}
func (rl *http2clientConnReadLoop) endStreamError(cs *http2clientStream, err error) {
- var code func()
- if err == nil {
- err = io.EOF
- code = cs.copyTrailers
- }
- if http2isConnectionCloseRequest(cs.req) {
- rl.closeWhenIdle = true
- }
- cs.bufPipe.closeWithErrorAndCode(err, code)
+ cs.readAborted = true
+ cs.abortStream(err)
+}
- select {
- case cs.resc <- http2resAndError{err: err}:
- default:
+func (rl *http2clientConnReadLoop) streamByID(id uint32) *http2clientStream {
+ rl.cc.mu.Lock()
+ defer rl.cc.mu.Unlock()
+ cs := rl.cc.streams[id]
+ if cs != nil && !cs.readAborted {
+ return cs
}
+ return nil
}
func (cs *http2clientStream) copyTrailers() {
@@ -8951,6 +9117,23 @@ func (rl *http2clientConnReadLoop) processGoAway(f *http2GoAwayFrame) error {
func (rl *http2clientConnReadLoop) processSettings(f *http2SettingsFrame) error {
cc := rl.cc
+ // Locking both mu and wmu here allows frame encoding to read settings with only wmu held.
+ // Acquiring wmu when f.IsAck() is unnecessary, but convenient and mostly harmless.
+ cc.wmu.Lock()
+ defer cc.wmu.Unlock()
+
+ if err := rl.processSettingsNoWrite(f); err != nil {
+ return err
+ }
+ if !f.IsAck() {
+ cc.fr.WriteSettingsAck()
+ cc.bw.Flush()
+ }
+ return nil
+}
+
+func (rl *http2clientConnReadLoop) processSettingsNoWrite(f *http2SettingsFrame) error {
+ cc := rl.cc
cc.mu.Lock()
defer cc.mu.Unlock()
@@ -8962,12 +9145,14 @@ func (rl *http2clientConnReadLoop) processSettings(f *http2SettingsFrame) error
return http2ConnectionError(http2ErrCodeProtocol)
}
+ var seenMaxConcurrentStreams bool
err := f.ForeachSetting(func(s http2Setting) error {
switch s.ID {
case http2SettingMaxFrameSize:
cc.maxFrameSize = s.Val
case http2SettingMaxConcurrentStreams:
cc.maxConcurrentStreams = s.Val
+ seenMaxConcurrentStreams = true
case http2SettingMaxHeaderListSize:
cc.peerMaxHeaderListSize = uint64(s.Val)
case http2SettingInitialWindowSize:
@@ -8999,17 +9184,23 @@ func (rl *http2clientConnReadLoop) processSettings(f *http2SettingsFrame) error
return err
}
- cc.wmu.Lock()
- defer cc.wmu.Unlock()
+ if !cc.seenSettings {
+ if !seenMaxConcurrentStreams {
+ // This was the servers initial SETTINGS frame and it
+ // didn't contain a MAX_CONCURRENT_STREAMS field so
+ // increase the number of concurrent streams this
+ // connection can establish to our default.
+ cc.maxConcurrentStreams = http2defaultMaxConcurrentStreams
+ }
+ cc.seenSettings = true
+ }
- cc.fr.WriteSettingsAck()
- cc.bw.Flush()
- return cc.werr
+ return nil
}
func (rl *http2clientConnReadLoop) processWindowUpdate(f *http2WindowUpdateFrame) error {
cc := rl.cc
- cs := cc.streamByID(f.StreamID, false)
+ cs := rl.streamByID(f.StreamID)
if f.StreamID != 0 && cs == nil {
return nil
}
@@ -9029,24 +9220,19 @@ func (rl *http2clientConnReadLoop) processWindowUpdate(f *http2WindowUpdateFrame
}
func (rl *http2clientConnReadLoop) processResetStream(f *http2RSTStreamFrame) error {
- cs := rl.cc.streamByID(f.StreamID, true)
+ cs := rl.streamByID(f.StreamID)
if cs == nil {
// TODO: return error if server tries to RST_STEAM an idle stream
return nil
}
- select {
- case <-cs.peerReset:
- // Already reset.
- // This is the only goroutine
- // which closes this, so there
- // isn't a race.
- default:
- err := http2streamError(cs.ID, f.ErrCode)
- cs.resetErr = err
- close(cs.peerReset)
- cs.bufPipe.CloseWithError(err)
- cs.cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl
+ serr := http2streamError(cs.ID, f.ErrCode)
+ serr.Cause = http2errFromPeer
+ if f.ErrCode == http2ErrCodeProtocol {
+ rl.cc.SetDoNotReuse()
}
+ cs.abortStream(serr)
+
+ cs.bufPipe.CloseWithError(serr)
return nil
}
@@ -9068,19 +9254,24 @@ func (cc *http2ClientConn) Ping(ctx context.Context) error {
}
cc.mu.Unlock()
}
- cc.wmu.Lock()
- if err := cc.fr.WritePing(false, p); err != nil {
- cc.wmu.Unlock()
- return err
- }
- if err := cc.bw.Flush(); err != nil {
- cc.wmu.Unlock()
- return err
- }
- cc.wmu.Unlock()
+ errc := make(chan error, 1)
+ go func() {
+ cc.wmu.Lock()
+ defer cc.wmu.Unlock()
+ if err := cc.fr.WritePing(false, p); err != nil {
+ errc <- err
+ return
+ }
+ if err := cc.bw.Flush(); err != nil {
+ errc <- err
+ return
+ }
+ }()
select {
case <-c:
return nil
+ case err := <-errc:
+ return err
case <-ctx.Done():
return ctx.Err()
case <-cc.readerDone:
@@ -9157,6 +9348,12 @@ func (t *http2Transport) logf(format string, args ...interface{}) {
var http2noBody io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
+type http2missingBody struct{}
+
+func (http2missingBody) Close() error { return nil }
+
+func (http2missingBody) Read([]byte) (int, error) { return 0, io.ErrUnexpectedEOF }
+
func http2strSliceContains(ss []string, s string) bool {
for _, v := range ss {
if v == s {
@@ -9203,87 +9400,6 @@ type http2errorReader struct{ err error }
func (r http2errorReader) Read(p []byte) (int, error) { return 0, r.err }
-// bodyWriterState encapsulates various state around the Transport's writing
-// of the request body, particularly regarding doing delayed writes of the body
-// when the request contains "Expect: 100-continue".
-type http2bodyWriterState struct {
- cs *http2clientStream
- timer *time.Timer // if non-nil, we're doing a delayed write
- fnonce *sync.Once // to call fn with
- fn func() // the code to run in the goroutine, writing the body
- resc chan error // result of fn's execution
- delay time.Duration // how long we should delay a delayed write for
-}
-
-func (t *http2Transport) getBodyWriterState(cs *http2clientStream, body io.Reader) (s http2bodyWriterState) {
- s.cs = cs
- if body == nil {
- return
- }
- resc := make(chan error, 1)
- s.resc = resc
- s.fn = func() {
- cs.cc.mu.Lock()
- cs.startedWrite = true
- cs.cc.mu.Unlock()
- resc <- cs.writeRequestBody(body, cs.req.Body)
- }
- s.delay = t.expectContinueTimeout()
- if s.delay == 0 ||
- !httpguts.HeaderValuesContainsToken(
- cs.req.Header["Expect"],
- "100-continue") {
- return
- }
- s.fnonce = new(sync.Once)
-
- // Arm the timer with a very large duration, which we'll
- // intentionally lower later. It has to be large now because
- // we need a handle to it before writing the headers, but the
- // s.delay value is defined to not start until after the
- // request headers were written.
- const hugeDuration = 365 * 24 * time.Hour
- s.timer = time.AfterFunc(hugeDuration, func() {
- s.fnonce.Do(s.fn)
- })
- return
-}
-
-func (s http2bodyWriterState) cancel() {
- if s.timer != nil {
- if s.timer.Stop() {
- s.resc <- nil
- }
- }
-}
-
-func (s http2bodyWriterState) on100() {
- if s.timer == nil {
- // If we didn't do a delayed write, ignore the server's
- // bogus 100 continue response.
- return
- }
- s.timer.Stop()
- go func() { s.fnonce.Do(s.fn) }()
-}
-
-// scheduleBodyWrite starts writing the body, either immediately (in
-// the common case) or after the delay timeout. It should not be
-// called until after the headers have been written.
-func (s http2bodyWriterState) scheduleBodyWrite() {
- if s.timer == nil {
- // We're not doing a delayed write (see
- // getBodyWriterState), so just start the writing
- // goroutine immediately.
- go s.fn()
- return
- }
- http2traceWait100Continue(s.cs.trace)
- if s.timer.Stop() {
- s.timer.Reset(s.delay)
- }
-}
-
// isConnectionCloseRequest reports whether req should use its own
// connection for a single request and then close the connection.
func http2isConnectionCloseRequest(req *Request) bool {
diff --git a/src/vendor/modules.txt b/src/vendor/modules.txt
index 70d42ed32c6..f61fc51ba82 100644
--- a/src/vendor/modules.txt
+++ b/src/vendor/modules.txt
@@ -8,7 +8,7 @@ golang.org/x/crypto/curve25519
golang.org/x/crypto/hkdf
golang.org/x/crypto/internal/subtle
golang.org/x/crypto/poly1305
-# golang.org/x/net v0.0.0-20210901185426-6d2eada6345e
+# golang.org/x/net v0.0.0-20211101194204-95aca89e93de
## explicit; go 1.17
golang.org/x/net/dns/dnsmessage
golang.org/x/net/http/httpguts