diff --git a/src/pkg/http/reverseproxy.go b/src/pkg/http/reverseproxy.go index e4ce1e34c7..015f87f246 100644 --- a/src/pkg/http/reverseproxy.go +++ b/src/pkg/http/reverseproxy.go @@ -10,7 +10,10 @@ import ( "io" "log" "net" + "os" "strings" + "sync" + "time" ) // ReverseProxy is an HTTP Handler that takes an incoming request and @@ -26,6 +29,12 @@ type ReverseProxy struct { // The Transport used to perform proxy requests. // If nil, DefaultTransport is used. Transport RoundTripper + + // FlushInterval specifies the flush interval, in + // nanoseconds, to flush to the client while + // coping the response body. + // If zero, no periodic flushing is done. + FlushInterval int64 } func singleJoiningSlash(a, b string) string { @@ -95,6 +104,55 @@ func (p *ReverseProxy) ServeHTTP(rw ResponseWriter, req *Request) { rw.WriteHeader(res.StatusCode) if res.Body != nil { - io.Copy(rw, res.Body) + var dst io.Writer = rw + if p.FlushInterval != 0 { + if wf, ok := rw.(writeFlusher); ok { + dst = &maxLatencyWriter{dst: wf, latency: p.FlushInterval} + } + } + io.Copy(dst, res.Body) } } + +type writeFlusher interface { + io.Writer + Flusher +} + +type maxLatencyWriter struct { + dst writeFlusher + latency int64 // nanos + + lk sync.Mutex // protects init of done, as well Write + Flush + done chan bool +} + +func (m *maxLatencyWriter) Write(p []byte) (n int, err os.Error) { + m.lk.Lock() + defer m.lk.Unlock() + if m.done == nil { + m.done = make(chan bool) + go m.flushLoop() + } + n, err = m.dst.Write(p) + if err != nil { + m.done <- true + } + return +} + +func (m *maxLatencyWriter) flushLoop() { + t := time.NewTicker(m.latency) + defer t.Stop() + for { + select { + case <-t.C: + m.lk.Lock() + m.dst.Flush() + m.lk.Unlock() + case <-m.done: + return + } + } + panic("unreached") +}