diff options
author | Brad Fitzpatrick <bradfitz@golang.org> | 2018-09-25 16:13:17 +0000 |
---|---|---|
committer | Brad Fitzpatrick <bradfitz@golang.org> | 2018-10-09 15:26:06 +0000 |
commit | 5440bfc2ea8c0a4c78d5161605659c07ea10e37a (patch) | |
tree | 8115b4555f52a7c50255044f5184156aa3117804 /src/net/http/httputil/reverseproxy.go | |
parent | ffc7bc55f351976c9bb1e04aeba75397d40cbb53 (diff) | |
download | go-5440bfc2ea8c0a4c78d5161605659c07ea10e37a.tar.gz go-5440bfc2ea8c0a4c78d5161605659c07ea10e37a.zip |
net/http/httputil: rewrite flushing code, disable on Server-Sent Events
* Rewrite the flushing code to not use a persistent goroutine, which
also simplifies testing.
* Define the meaning of a negative flush interval. Its meaning doesn't
change, but now it's locked in, and then we can use it to optimize
the performance of the non-buffered case to avoid use of an AfterFunc.
* Support (internal-only) special casing of FlushInterval values per
request/response.
* For now, treat Server-Sent Event responses as unbuffered. (or rather,
immediately flushed from the buffer per-write)
Fixes #27816
Change-Id: Ie0f975c997daa3db539504137c741a96d7022665
Reviewed-on: https://go-review.googlesource.com/c/137335
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org>
Diffstat (limited to 'src/net/http/httputil/reverseproxy.go')
-rw-r--r-- | src/net/http/httputil/reverseproxy.go | 89 |
1 files changed, 57 insertions, 32 deletions
diff --git a/src/net/http/httputil/reverseproxy.go b/src/net/http/httputil/reverseproxy.go index 1dddaa95a7..1efcbd3bbc 100644 --- a/src/net/http/httputil/reverseproxy.go +++ b/src/net/http/httputil/reverseproxy.go @@ -18,10 +18,6 @@ import ( "time" ) -// onExitFlushLoop is a callback set by tests to detect the state of the -// flushLoop() goroutine. -var onExitFlushLoop func() - // ReverseProxy is an HTTP Handler that takes an incoming request and // sends it to another server, proxying the response back to the // client. @@ -42,6 +38,12 @@ type ReverseProxy struct { // to flush to the client while copying the // response body. // If zero, no periodic flushing is done. + // A negative value means to flush immediately + // after each write to the client. + // The FlushInterval is ignored when ReverseProxy + // recognizes a response as a streaming response; + // for such reponses, writes are flushed to the client + // immediately. FlushInterval time.Duration // ErrorLog specifies an optional logger for errors @@ -271,7 +273,7 @@ func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { fl.Flush() } } - err = p.copyResponse(rw, res.Body) + err = p.copyResponse(rw, res.Body, p.flushInterval(req, res)) if err != nil { defer res.Body.Close() // Since we're streaming the response, if we run into an error all we can do @@ -332,15 +334,28 @@ func removeConnectionHeaders(h http.Header) { } } -func (p *ReverseProxy) copyResponse(dst io.Writer, src io.Reader) error { - if p.FlushInterval != 0 { +// flushInterval returns the p.FlushInterval value, conditionally +// overriding its value for a specific request/response. +func (p *ReverseProxy) flushInterval(req *http.Request, res *http.Response) time.Duration { + resCT := res.Header.Get("Content-Type") + + // For Server-Sent Events responses, flush immediately. + // The MIME type is defined in https://www.w3.org/TR/eventsource/#text-event-stream + if resCT == "text/event-stream" { + return -1 // negative means immediately + } + + // TODO: more specific cases? e.g. res.ContentLength == -1? + return p.FlushInterval +} + +func (p *ReverseProxy) copyResponse(dst io.Writer, src io.Reader, flushInterval time.Duration) error { + if flushInterval != 0 { if wf, ok := dst.(writeFlusher); ok { mlw := &maxLatencyWriter{ dst: wf, - latency: p.FlushInterval, - done: make(chan bool), + latency: flushInterval, } - go mlw.flushLoop() defer mlw.stop() dst = mlw } @@ -403,34 +418,44 @@ type writeFlusher interface { type maxLatencyWriter struct { dst writeFlusher - latency time.Duration + latency time.Duration // non-zero; negative means to flush immediately - mu sync.Mutex // protects Write + Flush - done chan bool + mu sync.Mutex // protects t, flushPending, and dst.Flush + t *time.Timer + flushPending bool } -func (m *maxLatencyWriter) Write(p []byte) (int, error) { +func (m *maxLatencyWriter) Write(p []byte) (n int, err error) { m.mu.Lock() defer m.mu.Unlock() - return m.dst.Write(p) + n, err = m.dst.Write(p) + if m.latency < 0 { + m.dst.Flush() + return + } + if m.flushPending { + return + } + if m.t == nil { + m.t = time.AfterFunc(m.latency, m.delayedFlush) + } else { + m.t.Reset(m.latency) + } + m.flushPending = true + return } -func (m *maxLatencyWriter) flushLoop() { - t := time.NewTicker(m.latency) - defer t.Stop() - for { - select { - case <-m.done: - if onExitFlushLoop != nil { - onExitFlushLoop() - } - return - case <-t.C: - m.mu.Lock() - m.dst.Flush() - m.mu.Unlock() - } - } +func (m *maxLatencyWriter) delayedFlush() { + m.mu.Lock() + defer m.mu.Unlock() + m.dst.Flush() + m.flushPending = false } -func (m *maxLatencyWriter) stop() { m.done <- true } +func (m *maxLatencyWriter) stop() { + m.mu.Lock() + defer m.mu.Unlock() + if m.t != nil { + m.t.Stop() + } +} |