aboutsummaryrefslogtreecommitdiff
path: root/src/net
diff options
context:
space:
mode:
authorDmitri Shuralyov <dmitshur@golang.org>2021-11-09 13:31:45 -0500
committerDmitri Shuralyov <dmitshur@golang.org>2021-11-09 20:10:36 +0000
commit77c473f4197b5ad4d90689d665534e598f3c0750 (patch)
tree1558aa65f2b152d848c45c46e38c8895d07e214b /src/net
parent233ea216c730a1902579ab2dea6d752e02d6f6f3 (diff)
downloadgo-77c473f4197b5ad4d90689d665534e598f3c0750.tar.gz
go-77c473f4197b5ad4d90689d665534e598f3c0750.zip
all: update vendored golang.org/x/{net,text} for Go 1.18 release
The Go 1.18 code freeze has recently started. This is a time to update all golang.org/x/... module versions that contribute packages to the std and cmd modules in the standard library to latest master versions. This CL updates only the net, text modules. The next CL will update further ones. For #36905. Change-Id: I9a5ac3cca22da961cfd09f3202e01e1187d42bdd Reviewed-on: https://go-review.googlesource.com/c/go/+/362735 Trust: Dmitri Shuralyov <dmitshur@golang.org> Run-TryBot: Dmitri Shuralyov <dmitshur@golang.org> TryBot-Result: Go Bot <gobot@golang.org> Reviewed-by: Heschi Kreinick <heschi@google.com>
Diffstat (limited to 'src/net')
-rw-r--r--src/net/http/h2_bundle.go368
1 files changed, 255 insertions, 113 deletions
diff --git a/src/net/http/h2_bundle.go b/src/net/http/h2_bundle.go
index 29226d4065..23a4d15326 100644
--- a/src/net/http/h2_bundle.go
+++ b/src/net/http/h2_bundle.go
@@ -6836,6 +6836,11 @@ type http2Transport struct {
// Defaults to 15s.
PingTimeout time.Duration
+ // WriteByteTimeout is the timeout after which the connection will be
+ // closed no data can be written to it. The timeout begins when data is
+ // available to write, and is extended whenever any bytes are written.
+ WriteByteTimeout time.Duration
+
// CountError, if non-nil, is called on HTTP/2 transport errors.
// It's intended to increment a metric for monitoring, such
// as an expvar or Prometheus metric.
@@ -7006,12 +7011,17 @@ type http2ClientConn struct {
// clientStream is the state for a single HTTP/2 stream. One of these
// is created for each Transport.RoundTrip call.
type http2clientStream struct {
- cc *http2ClientConn
- req *Request
+ cc *http2ClientConn
+
+ // Fields of Request that we may access even after the response body is closed.
+ ctx context.Context
+ reqCancel <-chan struct{}
+
trace *httptrace.ClientTrace // or nil
ID uint32
bufPipe http2pipe // buffered pipe with the flow-controlled response payload
requestedGzip bool
+ isHead bool
abortOnce sync.Once
abort chan struct{} // closed to signal stream should end immediately
@@ -7028,7 +7038,10 @@ type http2clientStream struct {
inflow http2flow // guarded by cc.mu
bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
readErr error // sticky read error; owned by transportResponseBody.Read
- stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu
+
+ reqBody io.ReadCloser
+ reqBodyContentLength int64 // -1 means unknown
+ reqBodyClosed bool // body has been closed; guarded by cc.mu
// owned by writeRequest:
sentEndStream bool // sent an END_STREAM flag to the peer
@@ -7068,6 +7081,10 @@ func (cs *http2clientStream) abortStreamLocked(err error) {
cs.abortErr = err
close(cs.abort)
})
+ if cs.reqBody != nil && !cs.reqBodyClosed {
+ cs.reqBody.Close()
+ cs.reqBodyClosed = true
+ }
// TODO(dneil): Clean up tests where cs.cc.cond is nil.
if cs.cc.cond != nil {
// Wake up writeRequestBody if it is waiting on flow control.
@@ -7075,31 +7092,43 @@ func (cs *http2clientStream) abortStreamLocked(err error) {
}
}
-func (cs *http2clientStream) abortRequestBodyWrite(err error) {
- if err == nil {
- panic("nil error")
- }
+func (cs *http2clientStream) abortRequestBodyWrite() {
cc := cs.cc
cc.mu.Lock()
- if cs.stopReqBody == nil {
- cs.stopReqBody = err
+ defer cc.mu.Unlock()
+ if cs.reqBody != nil && !cs.reqBodyClosed {
+ cs.reqBody.Close()
+ cs.reqBodyClosed = true
cc.cond.Broadcast()
}
- cc.mu.Unlock()
}
type http2stickyErrWriter struct {
- w io.Writer
- err *error
+ conn net.Conn
+ timeout time.Duration
+ err *error
}
func (sew http2stickyErrWriter) Write(p []byte) (n int, err error) {
if *sew.err != nil {
return 0, *sew.err
}
- n, err = sew.w.Write(p)
- *sew.err = err
- return
+ for {
+ if sew.timeout != 0 {
+ sew.conn.SetWriteDeadline(time.Now().Add(sew.timeout))
+ }
+ nn, err := sew.conn.Write(p[n:])
+ n += nn
+ if n < len(p) && nn > 0 && errors.Is(err, os.ErrDeadlineExceeded) {
+ // Keep extending the deadline so long as we're making progress.
+ continue
+ }
+ if sew.timeout != 0 {
+ sew.conn.SetWriteDeadline(time.Time{})
+ }
+ *sew.err = err
+ return n, err
+ }
}
// noCachedConnError is the concrete type of ErrNoCachedConn, which
@@ -7355,7 +7384,11 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2Client
// TODO: adjust this writer size to account for frame size +
// MTU + crypto/tls record padding.
- cc.bw = bufio.NewWriter(http2stickyErrWriter{c, &cc.werr})
+ cc.bw = bufio.NewWriter(http2stickyErrWriter{
+ conn: c,
+ timeout: t.WriteByteTimeout,
+ err: &cc.werr,
+ })
cc.br = bufio.NewReader(c)
cc.fr = http2NewFramer(cc.bw, cc.br)
if t.CountError != nil {
@@ -7466,6 +7499,61 @@ func (cc *http2ClientConn) ReserveNewRequest() bool {
return true
}
+// ClientConnState describes the state of a ClientConn.
+type http2ClientConnState struct {
+ // Closed is whether the connection is closed.
+ Closed bool
+
+ // Closing is whether the connection is in the process of
+ // closing. It may be closing due to shutdown, being a
+ // single-use connection, being marked as DoNotReuse, or
+ // having received a GOAWAY frame.
+ Closing bool
+
+ // StreamsActive is how many streams are active.
+ StreamsActive int
+
+ // StreamsReserved is how many streams have been reserved via
+ // ClientConn.ReserveNewRequest.
+ StreamsReserved int
+
+ // StreamsPending is how many requests have been sent in excess
+ // of the peer's advertised MaxConcurrentStreams setting and
+ // are waiting for other streams to complete.
+ StreamsPending int
+
+ // MaxConcurrentStreams is how many concurrent streams the
+ // peer advertised as acceptable. Zero means no SETTINGS
+ // frame has been received yet.
+ MaxConcurrentStreams uint32
+
+ // LastIdle, if non-zero, is when the connection last
+ // transitioned to idle state.
+ LastIdle time.Time
+}
+
+// State returns a snapshot of cc's state.
+func (cc *http2ClientConn) State() http2ClientConnState {
+ cc.wmu.Lock()
+ maxConcurrent := cc.maxConcurrentStreams
+ if !cc.seenSettings {
+ maxConcurrent = 0
+ }
+ cc.wmu.Unlock()
+
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+ return http2ClientConnState{
+ Closed: cc.closed,
+ Closing: cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil,
+ StreamsActive: len(cc.streams),
+ StreamsReserved: cc.streamsReserved,
+ StreamsPending: cc.pendingRequests,
+ LastIdle: cc.lastIdle,
+ MaxConcurrentStreams: maxConcurrent,
+ }
+}
+
// clientConnIdleState describes the suitability of a client
// connection to initiate a new RoundTrip request.
type http2clientConnIdleState struct {
@@ -7717,15 +7805,19 @@ func (cc *http2ClientConn) decrStreamReservationsLocked() {
func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
ctx := req.Context()
cs := &http2clientStream{
- cc: cc,
- req: req,
- trace: httptrace.ContextClientTrace(req.Context()),
- peerClosed: make(chan struct{}),
- abort: make(chan struct{}),
- respHeaderRecv: make(chan struct{}),
- donec: make(chan struct{}),
- }
- go cs.doRequest()
+ cc: cc,
+ ctx: ctx,
+ reqCancel: req.Cancel,
+ isHead: req.Method == "HEAD",
+ reqBody: req.Body,
+ reqBodyContentLength: http2actualContentLength(req),
+ trace: httptrace.ContextClientTrace(ctx),
+ peerClosed: make(chan struct{}),
+ abort: make(chan struct{}),
+ respHeaderRecv: make(chan struct{}),
+ donec: make(chan struct{}),
+ }
+ go cs.doRequest(req)
waitDone := func() error {
select {
@@ -7733,7 +7825,7 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
return nil
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return http2errRequestCanceled
}
}
@@ -7752,7 +7844,7 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
// doesn't, they'll RST_STREAM us soon enough. This is a
// heuristic to avoid adding knobs to Transport. Hopefully
// we can keep it.
- cs.abortRequestBodyWrite(http2errStopReqBodyWrite)
+ cs.abortRequestBodyWrite()
}
res.Request = req
res.TLS = cc.tlsState
@@ -7769,8 +7861,11 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
waitDone()
return nil, cs.abortErr
case <-ctx.Done():
- return nil, ctx.Err()
- case <-req.Cancel:
+ err := ctx.Err()
+ cs.abortStream(err)
+ return nil, err
+ case <-cs.reqCancel:
+ cs.abortStream(http2errRequestCanceled)
return nil, http2errRequestCanceled
}
}
@@ -7779,8 +7874,8 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
// doRequest runs for the duration of the request lifetime.
//
// It sends the request and performs post-request cleanup (closing Request.Body, etc.).
-func (cs *http2clientStream) doRequest() {
- err := cs.writeRequest()
+func (cs *http2clientStream) doRequest(req *Request) {
+ err := cs.writeRequest(req)
cs.cleanupWriteRequest(err)
}
@@ -7791,12 +7886,11 @@ func (cs *http2clientStream) doRequest() {
//
// It returns non-nil if the request ends otherwise.
// If the returned error is StreamError, the error Code may be used in resetting the stream.
-func (cs *http2clientStream) writeRequest() (err error) {
+func (cs *http2clientStream) writeRequest(req *Request) (err error) {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
- if err := http2checkConnHeaders(cs.req); err != nil {
+ if err := http2checkConnHeaders(req); err != nil {
return err
}
@@ -7808,7 +7902,7 @@ func (cs *http2clientStream) writeRequest() (err error) {
}
select {
case cc.reqHeaderMu <- struct{}{}:
- case <-req.Cancel:
+ case <-cs.reqCancel:
return http2errRequestCanceled
case <-ctx.Done():
return ctx.Err()
@@ -7831,7 +7925,7 @@ func (cs *http2clientStream) writeRequest() (err error) {
if !cc.t.disableCompression() &&
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
- req.Method != "HEAD" {
+ !cs.isHead {
// Request gzip only, not deflate. Deflate is ambiguous and
// not as universally supported anyway.
// See: https://zlib.net/zlib_faq.html#faq39
@@ -7850,19 +7944,23 @@ func (cs *http2clientStream) writeRequest() (err error) {
continueTimeout := cc.t.expectContinueTimeout()
if continueTimeout != 0 &&
!httpguts.HeaderValuesContainsToken(
- cs.req.Header["Expect"],
+ req.Header["Expect"],
"100-continue") {
continueTimeout = 0
cs.on100 = make(chan struct{}, 1)
}
- err = cs.encodeAndWriteHeaders()
+ // Past this point (where we send request headers), it is possible for
+ // RoundTrip to return successfully. Since the RoundTrip contract permits
+ // the caller to "mutate or reuse" the Request after closing the Response's Body,
+ // we must take care when referencing the Request from here on.
+ err = cs.encodeAndWriteHeaders(req)
<-cc.reqHeaderMu
if err != nil {
return err
}
- hasBody := http2actualContentLength(cs.req) != 0
+ hasBody := cs.reqBodyContentLength != 0
if !hasBody {
cs.sentEndStream = true
} else {
@@ -7878,7 +7976,7 @@ func (cs *http2clientStream) writeRequest() (err error) {
err = cs.abortErr
case <-ctx.Done():
err = ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
err = http2errRequestCanceled
}
timer.Stop()
@@ -7888,7 +7986,7 @@ func (cs *http2clientStream) writeRequest() (err error) {
}
}
- if err = cs.writeRequestBody(req.Body); err != nil {
+ if err = cs.writeRequestBody(req); err != nil {
if err != http2errStopReqBodyWrite {
http2traceWroteRequest(cs.trace, err)
return err
@@ -7923,16 +8021,15 @@ func (cs *http2clientStream) writeRequest() (err error) {
return cs.abortErr
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return http2errRequestCanceled
}
}
}
-func (cs *http2clientStream) encodeAndWriteHeaders() error {
+func (cs *http2clientStream) encodeAndWriteHeaders(req *Request) error {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
cc.wmu.Lock()
defer cc.wmu.Unlock()
@@ -7943,7 +8040,7 @@ func (cs *http2clientStream) encodeAndWriteHeaders() error {
return cs.abortErr
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return http2errRequestCanceled
default:
}
@@ -7953,14 +8050,14 @@ func (cs *http2clientStream) encodeAndWriteHeaders() error {
// we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
// sent by writeRequestBody below, along with any Trailers,
// again in form HEADERS{1}, CONTINUATION{0,})
- trailers, err := http2commaSeparatedTrailers(cs.req)
+ trailers, err := http2commaSeparatedTrailers(req)
if err != nil {
return err
}
hasTrailers := trailers != ""
- contentLen := http2actualContentLength(cs.req)
+ contentLen := http2actualContentLength(req)
hasBody := contentLen != 0
- hdrs, err := cc.encodeHeaders(cs.req, cs.requestedGzip, trailers, contentLen)
+ hdrs, err := cc.encodeHeaders(req, cs.requestedGzip, trailers, contentLen)
if err != nil {
return err
}
@@ -7979,7 +8076,6 @@ func (cs *http2clientStream) encodeAndWriteHeaders() error {
// cleanupWriteRequest will send a reset to the peer.
func (cs *http2clientStream) cleanupWriteRequest(err error) {
cc := cs.cc
- req := cs.req
if cs.ID == 0 {
// We were canceled before creating the stream, so return our reservation.
@@ -7990,10 +8086,12 @@ func (cs *http2clientStream) cleanupWriteRequest(err error) {
// Request.Body is closed by the Transport,
// and in multiple cases: server replies <=299 and >299
// while still writing request body
- if req.Body != nil {
- if e := req.Body.Close(); err == nil {
- err = e
- }
+ cc.mu.Lock()
+ bodyClosed := cs.reqBodyClosed
+ cs.reqBodyClosed = true
+ cc.mu.Unlock()
+ if !bodyClosed && cs.reqBody != nil {
+ cs.reqBody.Close()
}
if err != nil && cs.sentEndStream {
@@ -8027,7 +8125,6 @@ func (cs *http2clientStream) cleanupWriteRequest(err error) {
if cs.ID != 0 {
cc.forgetStreamID(cs.ID)
}
- close(cs.donec)
cc.wmu.Lock()
werr := cc.werr
@@ -8035,6 +8132,8 @@ func (cs *http2clientStream) cleanupWriteRequest(err error) {
if werr != nil {
cc.Close()
}
+
+ close(cs.donec)
}
// awaitOpenSlotForStream waits until len(streams) < maxConcurrentStreams.
@@ -8108,7 +8207,7 @@ func (cs *http2clientStream) frameScratchBufferLen(maxFrameSize int) int {
if n > max {
n = max
}
- if cl := http2actualContentLength(cs.req); cl != -1 && cl+1 < n {
+ if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
// Add an extra byte past the declared content-length to
// give the caller's Request.Body io.Reader a chance to
// give us more bytes than they declared, so we can catch it
@@ -8123,13 +8222,13 @@ func (cs *http2clientStream) frameScratchBufferLen(maxFrameSize int) int {
var http2bufPool sync.Pool // of *[]byte
-func (cs *http2clientStream) writeRequestBody(body io.Reader) (err error) {
+func (cs *http2clientStream) writeRequestBody(req *Request) (err error) {
cc := cs.cc
+ body := cs.reqBody
sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
- req := cs.req
hasTrailers := req.Trailer != nil
- remainLen := http2actualContentLength(req)
+ remainLen := cs.reqBodyContentLength
hasContentLen := remainLen != -1
cc.mu.Lock()
@@ -8170,23 +8269,26 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader) (err error) {
return err
}
}
- if err == io.EOF {
- sawEOF = true
- err = nil
- } else if err != nil {
- return err
+ if err != nil {
+ cc.mu.Lock()
+ bodyClosed := cs.reqBodyClosed
+ cc.mu.Unlock()
+ switch {
+ case bodyClosed:
+ return http2errStopReqBodyWrite
+ case err == io.EOF:
+ sawEOF = true
+ err = nil
+ default:
+ return err
+ }
}
remain := buf[:n]
for len(remain) > 0 && err == nil {
var allowed int32
allowed, err = cs.awaitFlowControl(len(remain))
- switch {
- case err == http2errStopReqBodyWrite:
- return err
- case err == http2errStopReqBodyWriteAndCancel:
- return err
- case err != nil:
+ if err != nil {
return err
}
cc.wmu.Lock()
@@ -8217,16 +8319,26 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader) (err error) {
return nil
}
+ // Since the RoundTrip contract permits the caller to "mutate or reuse"
+ // a request after the Response's Body is closed, verify that this hasn't
+ // happened before accessing the trailers.
+ cc.mu.Lock()
+ trailer := req.Trailer
+ err = cs.abortErr
+ cc.mu.Unlock()
+ if err != nil {
+ return err
+ }
+
cc.wmu.Lock()
+ defer cc.wmu.Unlock()
var trls []byte
- if hasTrailers {
- trls, err = cc.encodeTrailers(req)
+ if len(trailer) > 0 {
+ trls, err = cc.encodeTrailers(trailer)
if err != nil {
- cc.wmu.Unlock()
return err
}
}
- defer cc.wmu.Unlock()
// Two ways to send END_STREAM: either with trailers, or
// with an empty DATA frame.
@@ -8247,23 +8359,22 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader) (err error) {
// if the stream is dead.
func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
cc.mu.Lock()
defer cc.mu.Unlock()
for {
if cc.closed {
return 0, http2errClientConnClosed
}
- if cs.stopReqBody != nil {
- return 0, cs.stopReqBody
+ if cs.reqBodyClosed {
+ return 0, http2errStopReqBodyWrite
}
select {
case <-cs.abort:
return 0, cs.abortErr
case <-ctx.Done():
return 0, ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return 0, http2errRequestCanceled
default:
}
@@ -8477,11 +8588,11 @@ func http2shouldSendReqContentLength(method string, contentLength int64) bool {
}
// requires cc.wmu be held.
-func (cc *http2ClientConn) encodeTrailers(req *Request) ([]byte, error) {
+func (cc *http2ClientConn) encodeTrailers(trailer Header) ([]byte, error) {
cc.hbuf.Reset()
hlSize := uint64(0)
- for k, vv := range req.Trailer {
+ for k, vv := range trailer {
for _, v := range vv {
hf := hpack.HeaderField{Name: k, Value: v}
hlSize += uint64(hf.Size())
@@ -8491,7 +8602,7 @@ func (cc *http2ClientConn) encodeTrailers(req *Request) ([]byte, error) {
return nil, http2errRequestHeaderListSize
}
- for k, vv := range req.Trailer {
+ for k, vv := range trailer {
lowKey, ascii := http2asciiToLower(k)
if !ascii {
// Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
@@ -8627,7 +8738,13 @@ func (rl *http2clientConnReadLoop) cleanup() {
}
cc.closed = true
for _, cs := range cc.streams {
- cs.abortStreamLocked(err)
+ select {
+ case <-cs.peerClosed:
+ // The server closed the stream before closing the conn,
+ // so no need to interrupt it.
+ default:
+ cs.abortStreamLocked(err)
+ }
}
cc.cond.Broadcast()
cc.mu.Unlock()
@@ -8869,28 +8986,35 @@ func (rl *http2clientConnReadLoop) handleResponse(cs *http2clientStream, f *http
return nil, nil
}
- streamEnded := f.StreamEnded()
- isHead := cs.req.Method == "HEAD"
- if !streamEnded || isHead {
- res.ContentLength = -1
- if clens := res.Header["Content-Length"]; len(clens) == 1 {
- if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
- res.ContentLength = int64(cl)
- } else {
- // TODO: care? unlike http/1, it won't mess up our framing, so it's
- // more safe smuggling-wise to ignore.
- }
- } else if len(clens) > 1 {
+ res.ContentLength = -1
+ if clens := res.Header["Content-Length"]; len(clens) == 1 {
+ if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
+ res.ContentLength = int64(cl)
+ } else {
// TODO: care? unlike http/1, it won't mess up our framing, so it's
// more safe smuggling-wise to ignore.
}
+ } else if len(clens) > 1 {
+ // TODO: care? unlike http/1, it won't mess up our framing, so it's
+ // more safe smuggling-wise to ignore.
+ } else if f.StreamEnded() && !cs.isHead {
+ res.ContentLength = 0
}
- if streamEnded || isHead {
+ if cs.isHead {
res.Body = http2noBody
return res, nil
}
+ if f.StreamEnded() {
+ if res.ContentLength > 0 {
+ res.Body = http2missingBody{}
+ } else {
+ res.Body = http2noBody
+ }
+ return res, nil
+ }
+
cs.bufPipe.setBuffer(&http2dataBuffer{expected: res.ContentLength})
cs.bytesRemain = res.ContentLength
res.Body = http2transportResponseBody{cs}
@@ -8934,8 +9058,7 @@ func (rl *http2clientConnReadLoop) processTrailers(cs *http2clientStream, f *htt
}
// transportResponseBody is the concrete type of Transport.RoundTrip's
-// Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body.
-// On Close it sends RST_STREAM if EOF wasn't already seen.
+// Response.Body. It is an io.ReadCloser.
type http2transportResponseBody struct {
cs *http2clientStream
}
@@ -9018,6 +9141,8 @@ func (b http2transportResponseBody) Close() error {
}
cc.mu.Unlock()
+ // TODO(dneil): Acquiring this mutex can block indefinitely.
+ // Move flow control return to a goroutine?
cc.wmu.Lock()
// Return connection-level flow control.
if unread > 0 {
@@ -9032,9 +9157,9 @@ func (b http2transportResponseBody) Close() error {
select {
case <-cs.donec:
- case <-cs.req.Context().Done():
- return cs.req.Context().Err()
- case <-cs.req.Cancel:
+ case <-cs.ctx.Done():
+ return cs.ctx.Err()
+ case <-cs.reqCancel:
return http2errRequestCanceled
}
return nil
@@ -9088,7 +9213,7 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error {
return nil
}
if f.Length > 0 {
- if cs.req.Method == "HEAD" && len(data) > 0 {
+ if cs.isHead && len(data) > 0 {
cc.logf("protocol error: received DATA on a HEAD request")
rl.endStreamError(cs, http2StreamError{
StreamID: f.StreamID,
@@ -9157,6 +9282,12 @@ func (rl *http2clientConnReadLoop) endStream(cs *http2clientStream) {
// server.go's (*stream).endStream method.
if !cs.readClosed {
cs.readClosed = true
+ // Close cs.bufPipe and cs.peerClosed with cc.mu held to avoid a
+ // race condition: The caller can read io.EOF from Response.Body
+ // and close the body before we close cs.peerClosed, causing
+ // cleanupWriteRequest to send a RST_STREAM.
+ rl.cc.mu.Lock()
+ defer rl.cc.mu.Unlock()
cs.bufPipe.closeWithErrorAndCode(io.EOF, cs.copyTrailers)
close(cs.peerClosed)
}
@@ -9344,19 +9475,24 @@ func (cc *http2ClientConn) Ping(ctx context.Context) error {
}
cc.mu.Unlock()
}
- cc.wmu.Lock()
- if err := cc.fr.WritePing(false, p); err != nil {
- cc.wmu.Unlock()
- return err
- }
- if err := cc.bw.Flush(); err != nil {
- cc.wmu.Unlock()
- return err
- }
- cc.wmu.Unlock()
+ errc := make(chan error, 1)
+ go func() {
+ cc.wmu.Lock()
+ defer cc.wmu.Unlock()
+ if err := cc.fr.WritePing(false, p); err != nil {
+ errc <- err
+ return
+ }
+ if err := cc.bw.Flush(); err != nil {
+ errc <- err
+ return
+ }
+ }()
select {
case <-c:
return nil
+ case err := <-errc:
+ return err
case <-ctx.Done():
return ctx.Err()
case <-cc.readerDone:
@@ -9433,6 +9569,12 @@ func (t *http2Transport) logf(format string, args ...interface{}) {
var http2noBody io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
+type http2missingBody struct{}
+
+func (http2missingBody) Close() error { return nil }
+
+func (http2missingBody) Read([]byte) (int, error) { return 0, io.ErrUnexpectedEOF }
+
func http2strSliceContains(ss []string, s string) bool {
for _, v := range ss {
if v == s {