diff options
author | Brad Fitzpatrick <bradfitz@golang.org> | 2016-07-27 00:17:38 +0200 |
---|---|---|
committer | Andrew Gerrand <adg@golang.org> | 2016-07-26 23:04:15 +0000 |
commit | 66b47431cba75ce23630e17c1a3aa000e7b33d06 (patch) | |
tree | c50c763037709712de87b907cbf7f69f9cf2f325 | |
parent | b11fff3886705bd98aec4923f43216ed8e13cb1a (diff) | |
download | go-66b47431cba75ce23630e17c1a3aa000e7b33d06.tar.gz go-66b47431cba75ce23630e17c1a3aa000e7b33d06.zip |
net/http: update bundled http2
Updates x/net/http2 to git rev 6a513af for:
http2: return flow control for closed streams
https://golang.org/cl/25231
http2: make Transport prefer HTTP response header recv before body write error
https://golang.org/cl/24984
http2: make Transport treat "Connection: close" the same as Request.Close
https://golang.org/cl/24982
Fixes golang/go#16481
Change-Id: Iaddb166387ca2df1cfbbf09a166f8605578bec49
Reviewed-on: https://go-review.googlesource.com/25282
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Andrew Gerrand <adg@golang.org>
-rw-r--r-- | src/net/http/h2_bundle.go | 121 |
1 files changed, 95 insertions, 26 deletions
diff --git a/src/net/http/h2_bundle.go b/src/net/http/h2_bundle.go index 47e5f577e6..a117897bcf 100644 --- a/src/net/http/h2_bundle.go +++ b/src/net/http/h2_bundle.go @@ -85,7 +85,16 @@ const ( http2noDialOnMiss = false ) -func (p *http2clientConnPool) getClientConn(_ *Request, addr string, dialOnMiss bool) (*http2ClientConn, error) { +func (p *http2clientConnPool) getClientConn(req *Request, addr string, dialOnMiss bool) (*http2ClientConn, error) { + if http2isConnectionCloseRequest(req) && dialOnMiss { + // It gets its own connection. + const singleUse = true + cc, err := p.t.dialClientConn(addr, singleUse) + if err != nil { + return nil, err + } + return cc, nil + } p.mu.Lock() for _, cc := range p.conns[addr] { if cc.CanTakeNewRequest() { @@ -128,7 +137,8 @@ func (p *http2clientConnPool) getStartDialLocked(addr string) *http2dialCall { // run in its own goroutine. func (c *http2dialCall) dial(addr string) { - c.res, c.err = c.p.t.dialClientConn(addr) + const singleUse = false // shared conn + c.res, c.err = c.p.t.dialClientConn(addr, singleUse) close(c.done) c.p.mu.Lock() @@ -3803,6 +3813,9 @@ func (sc *http2serverConn) closeStream(st *http2stream, err error) { } delete(sc.streams, st.id) if p := st.body; p != nil { + + sc.sendWindowUpdate(nil, p.Len()) + p.CloseWithError(err) } st.cw.Close() @@ -3879,17 +3892,24 @@ func (sc *http2serverConn) processSettingInitialWindowSize(val uint32) error { func (sc *http2serverConn) processData(f *http2DataFrame) error { sc.serveG.check() + data := f.Data() id := f.Header().StreamID st, ok := sc.streams[id] if !ok || st.state != http2stateOpen || st.gotTrailerHeader { + if int(sc.inflow.available()) < len(data) { + return http2StreamError{id, http2ErrCodeFlowControl} + } + + sc.inflow.take(int32(len(data))) + sc.sendWindowUpdate(nil, len(data)) + return http2StreamError{id, http2ErrCodeStreamClosed} } if st.body == nil { panic("internal error: should have a body in this state") } - data := f.Data() if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes { st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes)) @@ -4919,9 +4939,10 @@ func (t *http2Transport) initConnPool() { // ClientConn is the state of a single HTTP/2 client connection to an // HTTP/2 server. type http2ClientConn struct { - t *http2Transport - tconn net.Conn // usually *tls.Conn, except specialized impls - tlsState *tls.ConnectionState // nil only for specialized impls + t *http2Transport + tconn net.Conn // usually *tls.Conn, except specialized impls + tlsState *tls.ConnectionState // nil only for specialized impls + singleUse bool // whether being used for a single http.Request // readLoop goroutine fields: readerDone chan struct{} // closed on error @@ -5117,7 +5138,7 @@ func http2shouldRetryRequest(req *Request, err error) bool { return err == http2errClientConnUnusable } -func (t *http2Transport) dialClientConn(addr string) (*http2ClientConn, error) { +func (t *http2Transport) dialClientConn(addr string, singleUse bool) (*http2ClientConn, error) { host, _, err := net.SplitHostPort(addr) if err != nil { return nil, err @@ -5126,7 +5147,7 @@ func (t *http2Transport) dialClientConn(addr string) (*http2ClientConn, error) { if err != nil { return nil, err } - return t.NewClientConn(tconn) + return t.newClientConn(tconn, singleUse) } func (t *http2Transport) newTLSConfig(host string) *tls.Config { @@ -5187,6 +5208,10 @@ func (t *http2Transport) expectContinueTimeout() time.Duration { } func (t *http2Transport) NewClientConn(c net.Conn) (*http2ClientConn, error) { + return t.newClientConn(c, false) +} + +func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2ClientConn, error) { if http2VerboseLogs { t.vlogf("http2: Transport creating client conn to %v", c.RemoteAddr()) } @@ -5204,6 +5229,7 @@ func (t *http2Transport) NewClientConn(c net.Conn) (*http2ClientConn, error) { initialWindowSize: 65535, maxConcurrentStreams: 1000, streams: make(map[uint32]*http2clientStream), + singleUse: singleUse, } cc.cond = sync.NewCond(&cc.mu) cc.flow.add(int32(http2initialWindowSize)) @@ -5288,6 +5314,9 @@ func (cc *http2ClientConn) CanTakeNewRequest() bool { } func (cc *http2ClientConn) canTakeNewRequestLocked() bool { + if cc.singleUse && cc.nextStreamID > 1 { + return false + } return cc.goAway == nil && !cc.closed && int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) && cc.nextStreamID < 2147483647 @@ -5494,22 +5523,26 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { bodyWritten := false ctx := http2reqContext(req) + handleReadLoopResponse := func(re http2resAndError) (*Response, error) { + res := re.res + if re.err != nil || res.StatusCode > 299 { + + bodyWriter.cancel() + cs.abortRequestBodyWrite(http2errStopReqBodyWrite) + } + if re.err != nil { + cc.forgetStreamID(cs.ID) + return nil, re.err + } + res.Request = req + res.TLS = cc.tlsState + return res, nil + } + for { select { case re := <-readLoopResCh: - res := re.res - if re.err != nil || res.StatusCode > 299 { - - bodyWriter.cancel() - cs.abortRequestBodyWrite(http2errStopReqBodyWrite) - } - if re.err != nil { - cc.forgetStreamID(cs.ID) - return nil, re.err - } - res.Request = req - res.TLS = cc.tlsState - return res, nil + return handleReadLoopResponse(re) case <-respHeaderTimer: cc.forgetStreamID(cs.ID) if !hasBody || bodyWritten { @@ -5541,6 +5574,12 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { return nil, cs.resetErr case err := <-bodyWriter.resc: + + select { + case re := <-readLoopResCh: + return handleReadLoopResponse(re) + default: + } if err != nil { return nil, err } @@ -5932,7 +5971,7 @@ func (rl *http2clientConnReadLoop) cleanup() { func (rl *http2clientConnReadLoop) run() error { cc := rl.cc - rl.closeWhenIdle = cc.t.disableKeepAlives() + rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse gotReply := false for { f, err := cc.fr.ReadFrame() @@ -6216,10 +6255,27 @@ var http2errClosedResponseBody = errors.New("http2: response body closed") func (b http2transportResponseBody) Close() error { cs := b.cs - if cs.bufPipe.Err() != io.EOF { + cc := cs.cc - cs.cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) + serverSentStreamEnd := cs.bufPipe.Err() == io.EOF + unread := cs.bufPipe.Len() + + if unread > 0 || !serverSentStreamEnd { + cc.mu.Lock() + cc.wmu.Lock() + if !serverSentStreamEnd { + cc.fr.WriteRSTStream(cs.ID, http2ErrCodeCancel) + } + + if unread > 0 { + cc.inflow.add(int32(unread)) + cc.fr.WriteWindowUpdate(0, uint32(unread)) + } + cc.bw.Flush() + cc.wmu.Unlock() + cc.mu.Unlock() } + cs.bufPipe.BreakWithError(http2errClosedResponseBody) return nil } @@ -6227,6 +6283,7 @@ func (b http2transportResponseBody) Close() error { func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error { cc := rl.cc cs := cc.streamByID(f.StreamID, f.StreamEnded()) + data := f.Data() if cs == nil { cc.mu.Lock() neverSent := cc.nextStreamID @@ -6237,9 +6294,15 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error { return http2ConnectionError(http2ErrCodeProtocol) } + if len(data) > 0 { + cc.wmu.Lock() + cc.fr.WriteWindowUpdate(0, uint32(len(data))) + cc.bw.Flush() + cc.wmu.Unlock() + } return nil } - if data := f.Data(); len(data) > 0 { + if len(data) > 0 { if cs.bufPipe.b == nil { cc.logf("http2: Transport received DATA frame for closed stream; closing connection") @@ -6282,7 +6345,7 @@ func (rl *http2clientConnReadLoop) endStreamError(cs *http2clientStream, err err } cs.bufPipe.closeWithErrorAndCode(err, code) delete(rl.activeRes, cs.ID) - if cs.req.Close || cs.req.Header.Get("Connection") == "close" { + if http2isConnectionCloseRequest(cs.req) { rl.closeWhenIdle = true } } @@ -6538,6 +6601,12 @@ func (s http2bodyWriterState) scheduleBodyWrite() { } } +// isConnectionCloseRequest reports whether req should use its own +// connection for a single request and then close the connection. +func http2isConnectionCloseRequest(req *Request) bool { + return req.Close || httplex.HeaderValuesContainsToken(req.Header["Connection"], "close") +} + // writeFramer is implemented by any type that is used to write frames. type http2writeFramer interface { writeFrame(http2writeContext) error |