From 893d6866213ca539195076f0e3338da99f321c9c Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Thu, 29 Sep 2016 18:05:06 -0700 Subject: [PATCH] net/http: update bundled http2, add h2 Transport.IdleConnTimeout tests Updates bundled http2 to x/net git rev a333c53 for: http2: add Transport support for IdleConnTimeout https://golang.org/cl/30075 And add tests. The bundled http2 also includes a change adding a Ping method to http2.ClientConn, but that type isn't exposed in the standard library. Nevertheless, the code gets compiled and adds a dependency on "crypto/rand", requiring an update to go/build's dependency test. Because net/http already depends on crypto/tls, which uses crypto/rand, it's not really a new dependency. Fixes #16808 Change-Id: I1ec8666ea74762f27c70a6f30a366a6647f923f7 Reviewed-on: https://go-review.googlesource.com/30078 Run-TryBot: Brad Fitzpatrick TryBot-Result: Gobot Gobot Reviewed-by: Ian Lance Taylor --- src/go/build/deps_test.go | 11 ++-- src/net/http/export_test.go | 18 +++++++ src/net/http/h2_bundle.go | 93 +++++++++++++++++++++++++++++++++- src/net/http/transport_test.go | 31 ++++++++---- 4 files changed, 137 insertions(+), 16 deletions(-) diff --git a/src/go/build/deps_test.go b/src/go/build/deps_test.go index fb31ac31c3..48e258e087 100644 --- a/src/go/build/deps_test.go +++ b/src/go/build/deps_test.go @@ -376,16 +376,21 @@ var pkgDeps = map[string][]string{ // HTTP, kingpin of dependencies. "net/http": { "L4", "NET", "OS", - "context", "compress/gzip", "container/list", "crypto/tls", - "mime/multipart", "runtime/debug", - "net/http/internal", + "compress/gzip", + "container/list", + "context", + "crypto/rand", + "crypto/tls", "golang_org/x/net/http2/hpack", "golang_org/x/net/idna", "golang_org/x/net/lex/httplex", "golang_org/x/text/unicode/norm", "golang_org/x/text/width", "internal/nettrace", + "mime/multipart", "net/http/httptrace", + "net/http/internal", + "runtime/debug", }, "net/http/internal": {"L4"}, "net/http/httptrace": {"context", "internal/nettrace", "net", "reflect", "time"}, diff --git a/src/net/http/export_test.go b/src/net/http/export_test.go index 7fc3546caa..00824e754c 100644 --- a/src/net/http/export_test.go +++ b/src/net/http/export_test.go @@ -100,6 +100,24 @@ func (t *Transport) IdleConnStrsForTesting() []string { return ret } +func (t *Transport) IdleConnStrsForTesting_h2() []string { + var ret []string + noDialPool := t.h2transport.ConnPool.(http2noDialClientConnPool) + pool := noDialPool.http2clientConnPool + + pool.mu.Lock() + defer pool.mu.Unlock() + + for k, cc := range pool.conns { + for range cc { + ret = append(ret, k) + } + } + + sort.Strings(ret) + return ret +} + func (t *Transport) IdleConnCountForTesting(cacheKey string) int { t.idleMu.Lock() defer t.idleMu.Unlock() diff --git a/src/net/http/h2_bundle.go b/src/net/http/h2_bundle.go index 33b13db91f..d430f400e0 100644 --- a/src/net/http/h2_bundle.go +++ b/src/net/http/h2_bundle.go @@ -21,6 +21,7 @@ import ( "bytes" "compress/gzip" "context" + "crypto/rand" "crypto/tls" "encoding/binary" "errors" @@ -1255,7 +1256,7 @@ func (f *http2Framer) WriteSettings(settings ...http2Setting) error { return f.endWrite() } -// WriteSettings writes an empty SETTINGS frame with the ACK bit set. +// WriteSettingsAck writes an empty SETTINGS frame with the ACK bit set. // // It will perform exactly one Write to the underlying Writer. // It is the caller's responsibility to not call other Write methods concurrently. @@ -2092,6 +2093,13 @@ type http2clientTrace httptrace.ClientTrace func http2reqContext(r *Request) context.Context { return r.Context() } +func (t *http2Transport) idleConnTimeout() time.Duration { + if t.t1 != nil { + return t.t1.IdleConnTimeout + } + return 0 +} + func http2setResponseUncompressed(res *Response) { res.Uncompressed = true } func http2traceGotConn(req *Request, cc *http2ClientConn) { @@ -2146,6 +2154,11 @@ func http2requestTrace(req *Request) *http2clientTrace { return (*http2clientTrace)(trace) } +// Ping sends a PING frame to the server and waits for the ack. +func (cc *http2ClientConn) Ping(ctx context.Context) error { + return cc.ping(ctx) +} + func http2cloneTLSConfig(c *tls.Config) *tls.Config { return c.Clone() } var http2DebugGoroutines = os.Getenv("DEBUG_HTTP2_GOROUTINES") == "1" @@ -5013,6 +5026,9 @@ type http2ClientConn struct { readerDone chan struct{} // closed on error readerErr error // set before readerDone is closed + idleTimeout time.Duration // or 0 for never + idleTimer *time.Timer + mu sync.Mutex // guards following cond *sync.Cond // hold mu; broadcast on flow/closed changes flow http2flow // our conn-level flow control quota (cs.flow is per stream) @@ -5023,6 +5039,7 @@ type http2ClientConn struct { goAwayDebug string // goAway frame's debug data, retained as a string streams map[uint32]*http2clientStream // client-initiated nextStreamID uint32 + pings map[[8]byte]chan struct{} // in flight ping data to notification channel bw *bufio.Writer br *bufio.Reader fr *http2Framer @@ -5293,6 +5310,11 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2Client streams: make(map[uint32]*http2clientStream), singleUse: singleUse, wantSettingsAck: true, + pings: make(map[[8]byte]chan struct{}), + } + if d := t.idleConnTimeout(); d != 0 { + cc.idleTimeout = d + cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout) } if http2VerboseLogs { t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr()) @@ -5365,6 +5387,16 @@ func (cc *http2ClientConn) canTakeNewRequestLocked() bool { cc.nextStreamID < math.MaxInt32 } +// onIdleTimeout is called from a time.AfterFunc goroutine. It will +// only be called when we're idle, but because we're coming from a new +// goroutine, there could be a new request coming in at the same time, +// so this simply calls the synchronized closeIfIdle to shut down this +// connection. The timer could just call closeIfIdle, but this is more +// clear. +func (cc *http2ClientConn) onIdleTimeout() { + cc.closeIfIdle() +} + func (cc *http2ClientConn) closeIfIdle() { cc.mu.Lock() if len(cc.streams) > 0 { @@ -5499,6 +5531,9 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { if err := http2checkConnHeaders(req); err != nil { return nil, err } + if cc.idleTimer != nil { + cc.idleTimer.Stop() + } trailers, err := http2commaSeparatedTrailers(req) if err != nil { @@ -5848,7 +5883,7 @@ func (cc *http2ClientConn) encodeHeaders(req *Request, addGzipHeader bool, trail cc.writeHeader(":method", req.Method) if req.Method != "CONNECT" { cc.writeHeader(":path", path) - cc.writeHeader(":scheme", "https") + cc.writeHeader(":scheme", req.URL.Scheme) } if trailers != "" { cc.writeHeader("trailer", trailers) @@ -5966,6 +6001,9 @@ func (cc *http2ClientConn) streamByID(id uint32, andRemove bool) *http2clientStr if andRemove && cs != nil && !cc.closed { cc.lastActive = time.Now() delete(cc.streams, id) + if len(cc.streams) == 0 && cc.idleTimer != nil { + cc.idleTimer.Reset(cc.idleTimeout) + } close(cs.done) cc.cond.Broadcast() } @@ -6022,6 +6060,10 @@ func (rl *http2clientConnReadLoop) cleanup() { defer cc.t.connPool().MarkDead(cc) defer close(cc.readerDone) + if cc.idleTimer != nil { + cc.idleTimer.Stop() + } + err := cc.readerErr cc.mu.Lock() if cc.goAway != nil && http2isEOFOrNetReadError(err) { @@ -6577,9 +6619,56 @@ func (rl *http2clientConnReadLoop) processResetStream(f *http2RSTStreamFrame) er return nil } +// Ping sends a PING frame to the server and waits for the ack. +// Public implementation is in go17.go and not_go17.go +func (cc *http2ClientConn) ping(ctx http2contextContext) error { + c := make(chan struct{}) + // Generate a random payload + var p [8]byte + for { + if _, err := rand.Read(p[:]); err != nil { + return err + } + cc.mu.Lock() + + if _, found := cc.pings[p]; !found { + cc.pings[p] = c + cc.mu.Unlock() + break + } + cc.mu.Unlock() + } + cc.wmu.Lock() + if err := cc.fr.WritePing(false, p); err != nil { + cc.wmu.Unlock() + return err + } + if err := cc.bw.Flush(); err != nil { + cc.wmu.Unlock() + return err + } + cc.wmu.Unlock() + select { + case <-c: + return nil + case <-ctx.Done(): + return ctx.Err() + case <-cc.readerDone: + + return cc.readerErr + } +} + func (rl *http2clientConnReadLoop) processPing(f *http2PingFrame) error { if f.IsAck() { + cc := rl.cc + cc.mu.Lock() + defer cc.mu.Unlock() + if c, ok := cc.pings[f.Data]; ok { + close(c) + delete(cc.pings, f.Data) + } return nil } cc := rl.cc diff --git a/src/net/http/transport_test.go b/src/net/http/transport_test.go index a77c5fcc39..3051ec9473 100644 --- a/src/net/http/transport_test.go +++ b/src/net/http/transport_test.go @@ -3481,27 +3481,36 @@ func TestTransportMaxIdleConns(t *testing.T) { } } -func TestTransportIdleConnTimeout(t *testing.T) { +func TestTransportIdleConnTimeout_h1(t *testing.T) { testTransportIdleConnTimeout(t, h1Mode) } +func TestTransportIdleConnTimeout_h2(t *testing.T) { testTransportIdleConnTimeout(t, h2Mode) } +func testTransportIdleConnTimeout(t *testing.T, h2 bool) { if testing.Short() { t.Skip("skipping in short mode") } defer afterTest(t) - ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) { + const timeout = 1 * time.Second + + cst := newClientServerTest(t, h2, HandlerFunc(func(w ResponseWriter, r *Request) { // No body for convenience. })) - defer ts.Close() - - const timeout = 1 * time.Second - tr := &Transport{ - IdleConnTimeout: timeout, - } + defer cst.close() + tr := cst.tr + tr.IdleConnTimeout = timeout defer tr.CloseIdleConnections() c := &Client{Transport: tr} + idleConns := func() []string { + if h2 { + return tr.IdleConnStrsForTesting_h2() + } else { + return tr.IdleConnStrsForTesting() + } + } + var conn string doReq := func(n int) { - req, _ := NewRequest("GET", ts.URL, nil) + req, _ := NewRequest("GET", cst.ts.URL, nil) req = req.WithContext(httptrace.WithClientTrace(context.Background(), &httptrace.ClientTrace{ PutIdleConn: func(err error) { if err != nil { @@ -3514,7 +3523,7 @@ func TestTransportIdleConnTimeout(t *testing.T) { t.Fatal(err) } res.Body.Close() - conns := tr.IdleConnStrsForTesting() + conns := idleConns() if len(conns) != 1 { t.Fatalf("req %v: unexpected number of idle conns: %q", n, conns) } @@ -3530,7 +3539,7 @@ func TestTransportIdleConnTimeout(t *testing.T) { time.Sleep(timeout / 2) } time.Sleep(timeout * 3 / 2) - if got := tr.IdleConnStrsForTesting(); len(got) != 0 { + if got := idleConns(); len(got) != 0 { t.Errorf("idle conns = %q; want none", got) } }