aboutsummaryrefslogtreecommitdiff
path: root/src/net/http/httputil/reverseproxy.go
diff options
context:
space:
mode:
authorBrad Fitzpatrick <bradfitz@golang.org>2018-09-25 16:13:17 +0000
committerBrad Fitzpatrick <bradfitz@golang.org>2018-10-09 15:26:06 +0000
commit5440bfc2ea8c0a4c78d5161605659c07ea10e37a (patch)
tree8115b4555f52a7c50255044f5184156aa3117804 /src/net/http/httputil/reverseproxy.go
parentffc7bc55f351976c9bb1e04aeba75397d40cbb53 (diff)
downloadgo-5440bfc2ea8c0a4c78d5161605659c07ea10e37a.tar.gz
go-5440bfc2ea8c0a4c78d5161605659c07ea10e37a.zip
net/http/httputil: rewrite flushing code, disable on Server-Sent Events
* Rewrite the flushing code to not use a persistent goroutine, which also simplifies testing. * Define the meaning of a negative flush interval. Its meaning doesn't change, but now it's locked in, and then we can use it to optimize the performance of the non-buffered case to avoid use of an AfterFunc. * Support (internal-only) special casing of FlushInterval values per request/response. * For now, treat Server-Sent Event responses as unbuffered. (or rather, immediately flushed from the buffer per-write) Fixes #27816 Change-Id: Ie0f975c997daa3db539504137c741a96d7022665 Reviewed-on: https://go-review.googlesource.com/c/137335 Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org> TryBot-Result: Gobot Gobot <gobot@golang.org> Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org>
Diffstat (limited to 'src/net/http/httputil/reverseproxy.go')
-rw-r--r--src/net/http/httputil/reverseproxy.go89
1 files changed, 57 insertions, 32 deletions
diff --git a/src/net/http/httputil/reverseproxy.go b/src/net/http/httputil/reverseproxy.go
index 1dddaa95a7..1efcbd3bbc 100644
--- a/src/net/http/httputil/reverseproxy.go
+++ b/src/net/http/httputil/reverseproxy.go
@@ -18,10 +18,6 @@ import (
"time"
)
-// onExitFlushLoop is a callback set by tests to detect the state of the
-// flushLoop() goroutine.
-var onExitFlushLoop func()
-
// ReverseProxy is an HTTP Handler that takes an incoming request and
// sends it to another server, proxying the response back to the
// client.
@@ -42,6 +38,12 @@ type ReverseProxy struct {
// to flush to the client while copying the
// response body.
// If zero, no periodic flushing is done.
+ // A negative value means to flush immediately
+ // after each write to the client.
+ // The FlushInterval is ignored when ReverseProxy
+ // recognizes a response as a streaming response;
+ // for such reponses, writes are flushed to the client
+ // immediately.
FlushInterval time.Duration
// ErrorLog specifies an optional logger for errors
@@ -271,7 +273,7 @@ func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
fl.Flush()
}
}
- err = p.copyResponse(rw, res.Body)
+ err = p.copyResponse(rw, res.Body, p.flushInterval(req, res))
if err != nil {
defer res.Body.Close()
// Since we're streaming the response, if we run into an error all we can do
@@ -332,15 +334,28 @@ func removeConnectionHeaders(h http.Header) {
}
}
-func (p *ReverseProxy) copyResponse(dst io.Writer, src io.Reader) error {
- if p.FlushInterval != 0 {
+// flushInterval returns the p.FlushInterval value, conditionally
+// overriding its value for a specific request/response.
+func (p *ReverseProxy) flushInterval(req *http.Request, res *http.Response) time.Duration {
+ resCT := res.Header.Get("Content-Type")
+
+ // For Server-Sent Events responses, flush immediately.
+ // The MIME type is defined in https://www.w3.org/TR/eventsource/#text-event-stream
+ if resCT == "text/event-stream" {
+ return -1 // negative means immediately
+ }
+
+ // TODO: more specific cases? e.g. res.ContentLength == -1?
+ return p.FlushInterval
+}
+
+func (p *ReverseProxy) copyResponse(dst io.Writer, src io.Reader, flushInterval time.Duration) error {
+ if flushInterval != 0 {
if wf, ok := dst.(writeFlusher); ok {
mlw := &maxLatencyWriter{
dst: wf,
- latency: p.FlushInterval,
- done: make(chan bool),
+ latency: flushInterval,
}
- go mlw.flushLoop()
defer mlw.stop()
dst = mlw
}
@@ -403,34 +418,44 @@ type writeFlusher interface {
type maxLatencyWriter struct {
dst writeFlusher
- latency time.Duration
+ latency time.Duration // non-zero; negative means to flush immediately
- mu sync.Mutex // protects Write + Flush
- done chan bool
+ mu sync.Mutex // protects t, flushPending, and dst.Flush
+ t *time.Timer
+ flushPending bool
}
-func (m *maxLatencyWriter) Write(p []byte) (int, error) {
+func (m *maxLatencyWriter) Write(p []byte) (n int, err error) {
m.mu.Lock()
defer m.mu.Unlock()
- return m.dst.Write(p)
+ n, err = m.dst.Write(p)
+ if m.latency < 0 {
+ m.dst.Flush()
+ return
+ }
+ if m.flushPending {
+ return
+ }
+ if m.t == nil {
+ m.t = time.AfterFunc(m.latency, m.delayedFlush)
+ } else {
+ m.t.Reset(m.latency)
+ }
+ m.flushPending = true
+ return
}
-func (m *maxLatencyWriter) flushLoop() {
- t := time.NewTicker(m.latency)
- defer t.Stop()
- for {
- select {
- case <-m.done:
- if onExitFlushLoop != nil {
- onExitFlushLoop()
- }
- return
- case <-t.C:
- m.mu.Lock()
- m.dst.Flush()
- m.mu.Unlock()
- }
- }
+func (m *maxLatencyWriter) delayedFlush() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.dst.Flush()
+ m.flushPending = false
}
-func (m *maxLatencyWriter) stop() { m.done <- true }
+func (m *maxLatencyWriter) stop() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ if m.t != nil {
+ m.t.Stop()
+ }
+}