aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrad Fitzpatrick <bradfitz@golang.org>2011-04-04 19:22:47 -0700
committerBrad Fitzpatrick <bradfitz@golang.org>2011-04-04 19:22:47 -0700
commit243266f62e2af7167236f99bb0aa376d3c21246b (patch)
tree6a87a70a2fe2f7cef989d12f460c054f6091b9f5
parentf3ad899a2d50a0d6daaf69bc4b2ce69ee6b85334 (diff)
downloadgo-243266f62e2af7167236f99bb0aa376d3c21246b.tar.gz
go-243266f62e2af7167236f99bb0aa376d3c21246b.zip
http: fix Transport connection re-use race
A connection shouldn't be made available for re-use until its body has been consumed. (except in the case of pipelining, which isn't implemented yet) This CL fixes some issues seen with heavy load against Amazon S3. Subtle implementation detail: to prevent a race with the client requesting a new connection before previous one is returned, we actually have to call putIdleConnection _before_ we return from the final Read/Close call on the http.Response.Body. R=rsc, adg CC=golang-dev https://golang.org/cl/4351048
-rw-r--r--src/pkg/http/transport.go46
-rw-r--r--src/pkg/http/transport_test.go11
2 files changed, 38 insertions, 19 deletions
diff --git a/src/pkg/http/transport.go b/src/pkg/http/transport.go
index fa4120e27a..797d134aa8 100644
--- a/src/pkg/http/transport.go
+++ b/src/pkg/http/transport.go
@@ -424,25 +424,37 @@ func (pc *persistConn) readLoop() {
rc := <-pc.reqch
resp, err := pc.cc.Read(rc.req)
- if err == nil && !rc.req.Close {
- pc.t.putIdleConn(pc)
- }
+
if err == ErrPersistEOF {
// Succeeded, but we can't send any more
// persistent connections on this again. We
// hide this error to upstream callers.
alive = false
err = nil
- } else if err != nil {
+ } else if err != nil || rc.req.Close {
alive = false
}
+
hasBody := resp != nil && resp.ContentLength != 0
+ var waitForBodyRead chan bool
+ if alive {
+ if hasBody {
+ waitForBodyRead = make(chan bool)
+ resp.Body.(*bodyEOFSignal).fn = func() {
+ pc.t.putIdleConn(pc)
+ waitForBodyRead <- true
+ }
+ } else {
+ pc.t.putIdleConn(pc)
+ }
+ }
+
rc.ch <- responseAndError{resp, err}
// Wait for the just-returned response body to be fully consumed
// before we race and peek on the underlying bufio reader.
- if alive && hasBody {
- <-resp.Body.(*bodyEOFSignal).ch
+ if waitForBodyRead != nil {
+ <-waitForBodyRead
}
}
}
@@ -514,33 +526,33 @@ func responseIsKeepAlive(res *Response) bool {
func readResponseWithEOFSignal(r *bufio.Reader, requestMethod string) (resp *Response, err os.Error) {
resp, err = ReadResponse(r, requestMethod)
if err == nil && resp.ContentLength != 0 {
- resp.Body = &bodyEOFSignal{resp.Body, make(chan bool, 1), false}
+ resp.Body = &bodyEOFSignal{resp.Body, nil}
}
return
}
-// bodyEOFSignal wraps a ReadCloser but sends on ch once once
-// the wrapped ReadCloser is fully consumed (including on Close)
+// bodyEOFSignal wraps a ReadCloser but runs fn (if non-nil) at most
+// once, right before the final Read() or Close() call returns, but after
+// EOF has been seen.
type bodyEOFSignal struct {
body io.ReadCloser
- ch chan bool
- done bool
+ fn func()
}
func (es *bodyEOFSignal) Read(p []byte) (n int, err os.Error) {
n, err = es.body.Read(p)
- if err == os.EOF && !es.done {
- es.ch <- true
- es.done = true
+ if err == os.EOF && es.fn != nil {
+ es.fn()
+ es.fn = nil
}
return
}
func (es *bodyEOFSignal) Close() (err os.Error) {
err = es.body.Close()
- if err == nil && !es.done {
- es.ch <- true
- es.done = true
+ if err == nil && es.fn != nil {
+ es.fn()
+ es.fn = nil
}
return
}
diff --git a/src/pkg/http/transport_test.go b/src/pkg/http/transport_test.go
index 6a5438d9c6..8a77a48549 100644
--- a/src/pkg/http/transport_test.go
+++ b/src/pkg/http/transport_test.go
@@ -85,6 +85,7 @@ func TestTransportConnectionCloseOnResponse(t *testing.T) {
t.Fatalf("error in connectionClose=%v, req #%d, Do: %v", connectionClose, n, err)
}
body, err := ioutil.ReadAll(res.Body)
+ defer res.Body.Close()
if err != nil {
t.Fatalf("error in connectionClose=%v, req #%d, ReadAll: %v", connectionClose, n, err)
}
@@ -154,9 +155,11 @@ func TestTransportIdleCacheKeys(t *testing.T) {
t.Errorf("After CloseIdleConnections expected %d idle conn cache keys; got %d", e, g)
}
- if _, _, err := c.Get(ts.URL); err != nil {
+ resp, _, err := c.Get(ts.URL)
+ if err != nil {
t.Error(err)
}
+ ioutil.ReadAll(resp.Body)
keys := tr.IdleConnKeysForTesting()
if e, g := 1, len(keys); e != g {
@@ -187,7 +190,11 @@ func TestTransportMaxPerHostIdleConns(t *testing.T) {
// ch)
donech := make(chan bool)
doReq := func() {
- c.Get(ts.URL)
+ resp, _, err := c.Get(ts.URL)
+ if err != nil {
+ t.Error(err)
+ }
+ ioutil.ReadAll(resp.Body)
donech <- true
}
go doReq()