mirror of
https://github.com/golang/go
synced 2024-11-19 11:54:57 -07:00
http: support for periodic flushing in ReverseProxy
Fixes #2012 R=golang-dev, dsymonds CC=golang-dev https://golang.org/cl/4662091
This commit is contained in:
parent
7c47741811
commit
ce3c3953be
@ -10,7 +10,10 @@ import (
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ReverseProxy is an HTTP Handler that takes an incoming request and
|
||||
@ -26,6 +29,12 @@ type ReverseProxy struct {
|
||||
// The Transport used to perform proxy requests.
|
||||
// If nil, DefaultTransport is used.
|
||||
Transport RoundTripper
|
||||
|
||||
// FlushInterval specifies the flush interval, in
|
||||
// nanoseconds, to flush to the client while
|
||||
// coping the response body.
|
||||
// If zero, no periodic flushing is done.
|
||||
FlushInterval int64
|
||||
}
|
||||
|
||||
func singleJoiningSlash(a, b string) string {
|
||||
@ -95,6 +104,55 @@ func (p *ReverseProxy) ServeHTTP(rw ResponseWriter, req *Request) {
|
||||
rw.WriteHeader(res.StatusCode)
|
||||
|
||||
if res.Body != nil {
|
||||
io.Copy(rw, res.Body)
|
||||
var dst io.Writer = rw
|
||||
if p.FlushInterval != 0 {
|
||||
if wf, ok := rw.(writeFlusher); ok {
|
||||
dst = &maxLatencyWriter{dst: wf, latency: p.FlushInterval}
|
||||
}
|
||||
}
|
||||
io.Copy(dst, res.Body)
|
||||
}
|
||||
}
|
||||
|
||||
type writeFlusher interface {
|
||||
io.Writer
|
||||
Flusher
|
||||
}
|
||||
|
||||
type maxLatencyWriter struct {
|
||||
dst writeFlusher
|
||||
latency int64 // nanos
|
||||
|
||||
lk sync.Mutex // protects init of done, as well Write + Flush
|
||||
done chan bool
|
||||
}
|
||||
|
||||
func (m *maxLatencyWriter) Write(p []byte) (n int, err os.Error) {
|
||||
m.lk.Lock()
|
||||
defer m.lk.Unlock()
|
||||
if m.done == nil {
|
||||
m.done = make(chan bool)
|
||||
go m.flushLoop()
|
||||
}
|
||||
n, err = m.dst.Write(p)
|
||||
if err != nil {
|
||||
m.done <- true
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (m *maxLatencyWriter) flushLoop() {
|
||||
t := time.NewTicker(m.latency)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
m.lk.Lock()
|
||||
m.dst.Flush()
|
||||
m.lk.Unlock()
|
||||
case <-m.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
panic("unreached")
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user