1
0
mirror of https://github.com/golang/go synced 2024-11-23 14:30:02 -07:00

net/http: update bundled x/net/http2

This updates the bundled copy of x/net/http2 to x/net git rev a8e8f92cd6 for:

    http2: remove extra goroutine stack from awaitGracefulShutdown
    https://golang.org/cl/43230

    http2: Discard DATA frames from the server after the response body is closed
    https://golang.org/cl/43810

Fixes #20302
Fixes #18471
Fixes #20448

Change-Id: I00972836deb2fe6049f631ee44901732a641b171
Reviewed-on: https://go-review.googlesource.com/44006
Reviewed-by: Tom Bergan <tombergan@google.com>
This commit is contained in:
Brad Fitzpatrick 2017-05-24 00:05:04 +00:00
parent 1a63f116c1
commit e6e6cad632

View File

@ -2933,6 +2933,11 @@ func http2reqBodyIsNoBody(body io.ReadCloser) bool {
return body == NoBody return body == NoBody
} }
func http2configureServer19(s *Server, conf *http2Server) error {
s.RegisterOnShutdown(conf.state.startGracefulShutdown)
return nil
}
var http2DebugGoroutines = os.Getenv("DEBUG_HTTP2_GOROUTINES") == "1" var http2DebugGoroutines = os.Getenv("DEBUG_HTTP2_GOROUTINES") == "1"
type http2goroutineLock uint64 type http2goroutineLock uint64
@ -3515,12 +3520,12 @@ func http2validPseudoPath(v string) bool {
// underlying buffer is an interface. (io.Pipe is always unbuffered) // underlying buffer is an interface. (io.Pipe is always unbuffered)
type http2pipe struct { type http2pipe struct {
mu sync.Mutex mu sync.Mutex
c sync.Cond // c.L lazily initialized to &p.mu c sync.Cond // c.L lazily initialized to &p.mu
b http2pipeBuffer b http2pipeBuffer // nil when done reading
err error // read error once empty. non-nil means closed. err error // read error once empty. non-nil means closed.
breakErr error // immediate read error (caller doesn't see rest of b) breakErr error // immediate read error (caller doesn't see rest of b)
donec chan struct{} // closed on error donec chan struct{} // closed on error
readFn func() // optional code to run in Read before error readFn func() // optional code to run in Read before error
} }
type http2pipeBuffer interface { type http2pipeBuffer interface {
@ -3532,6 +3537,9 @@ type http2pipeBuffer interface {
func (p *http2pipe) Len() int { func (p *http2pipe) Len() int {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
if p.b == nil {
return 0
}
return p.b.Len() return p.b.Len()
} }
@ -3555,6 +3563,7 @@ func (p *http2pipe) Read(d []byte) (n int, err error) {
p.readFn() p.readFn()
p.readFn = nil p.readFn = nil
} }
p.b = nil
return 0, p.err return 0, p.err
} }
p.c.Wait() p.c.Wait()
@ -3575,6 +3584,9 @@ func (p *http2pipe) Write(d []byte) (n int, err error) {
if p.err != nil { if p.err != nil {
return 0, http2errClosedPipeWrite return 0, http2errClosedPipeWrite
} }
if p.breakErr != nil {
return len(d), nil
}
return p.b.Write(d) return p.b.Write(d)
} }
@ -3609,6 +3621,9 @@ func (p *http2pipe) closeWithError(dst *error, err error, fn func()) {
return return
} }
p.readFn = fn p.readFn = fn
if dst == &p.breakErr {
p.b = nil
}
*dst = err *dst = err
p.closeDoneLocked() p.closeDoneLocked()
} }
@ -3728,6 +3743,11 @@ type http2Server struct {
// NewWriteScheduler constructs a write scheduler for a connection. // NewWriteScheduler constructs a write scheduler for a connection.
// If nil, a default scheduler is chosen. // If nil, a default scheduler is chosen.
NewWriteScheduler func() http2WriteScheduler NewWriteScheduler func() http2WriteScheduler
// Internal state. This is a pointer (rather than embedded directly)
// so that we don't embed a Mutex in this struct, which will make the
// struct non-copyable, which might break some callers.
state *http2serverInternalState
} }
func (s *http2Server) initialConnRecvWindowSize() int32 { func (s *http2Server) initialConnRecvWindowSize() int32 {
@ -3758,6 +3778,40 @@ func (s *http2Server) maxConcurrentStreams() uint32 {
return http2defaultMaxStreams return http2defaultMaxStreams
} }
type http2serverInternalState struct {
mu sync.Mutex
activeConns map[*http2serverConn]struct{}
}
func (s *http2serverInternalState) registerConn(sc *http2serverConn) {
if s == nil {
return
}
s.mu.Lock()
s.activeConns[sc] = struct{}{}
s.mu.Unlock()
}
func (s *http2serverInternalState) unregisterConn(sc *http2serverConn) {
if s == nil {
return
}
s.mu.Lock()
delete(s.activeConns, sc)
s.mu.Unlock()
}
func (s *http2serverInternalState) startGracefulShutdown() {
if s == nil {
return
}
s.mu.Lock()
for sc := range s.activeConns {
sc.startGracefulShutdown()
}
s.mu.Unlock()
}
// ConfigureServer adds HTTP/2 support to a net/http Server. // ConfigureServer adds HTTP/2 support to a net/http Server.
// //
// The configuration conf may be nil. // The configuration conf may be nil.
@ -3770,9 +3824,13 @@ func http2ConfigureServer(s *Server, conf *http2Server) error {
if conf == nil { if conf == nil {
conf = new(http2Server) conf = new(http2Server)
} }
conf.state = &http2serverInternalState{activeConns: make(map[*http2serverConn]struct{})}
if err := http2configureServer18(s, conf); err != nil { if err := http2configureServer18(s, conf); err != nil {
return err return err
} }
if err := http2configureServer19(s, conf); err != nil {
return err
}
if s.TLSConfig == nil { if s.TLSConfig == nil {
s.TLSConfig = new(tls.Config) s.TLSConfig = new(tls.Config)
@ -3887,7 +3945,7 @@ func (s *http2Server) ServeConn(c net.Conn, opts *http2ServeConnOpts) {
streams: make(map[uint32]*http2stream), streams: make(map[uint32]*http2stream),
readFrameCh: make(chan http2readFrameResult), readFrameCh: make(chan http2readFrameResult),
wantWriteFrameCh: make(chan http2FrameWriteRequest, 8), wantWriteFrameCh: make(chan http2FrameWriteRequest, 8),
wantStartPushCh: make(chan http2startPushRequest, 8), serveMsgCh: make(chan interface{}, 8),
wroteFrameCh: make(chan http2frameWriteResult, 1), wroteFrameCh: make(chan http2frameWriteResult, 1),
bodyReadCh: make(chan http2bodyReadMsg), bodyReadCh: make(chan http2bodyReadMsg),
doneServing: make(chan struct{}), doneServing: make(chan struct{}),
@ -3900,6 +3958,9 @@ func (s *http2Server) ServeConn(c net.Conn, opts *http2ServeConnOpts) {
pushEnabled: true, pushEnabled: true,
} }
s.state.registerConn(sc)
defer s.state.unregisterConn(sc)
if sc.hs.WriteTimeout != 0 { if sc.hs.WriteTimeout != 0 {
sc.conn.SetWriteDeadline(time.Time{}) sc.conn.SetWriteDeadline(time.Time{})
} }
@ -3966,10 +4027,9 @@ type http2serverConn struct {
doneServing chan struct{} // closed when serverConn.serve ends doneServing chan struct{} // closed when serverConn.serve ends
readFrameCh chan http2readFrameResult // written by serverConn.readFrames readFrameCh chan http2readFrameResult // written by serverConn.readFrames
wantWriteFrameCh chan http2FrameWriteRequest // from handlers -> serve wantWriteFrameCh chan http2FrameWriteRequest // from handlers -> serve
wantStartPushCh chan http2startPushRequest // from handlers -> serve
wroteFrameCh chan http2frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes wroteFrameCh chan http2frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes
bodyReadCh chan http2bodyReadMsg // from handlers -> serve bodyReadCh chan http2bodyReadMsg // from handlers -> serve
testHookCh chan func(int) // code to run on the serve loop serveMsgCh chan interface{} // misc messages & code to send to / run on the serve loop
flow http2flow // conn-wide (not stream-specific) outbound flow control flow http2flow // conn-wide (not stream-specific) outbound flow control
inflow http2flow // conn-wide inbound flow control inflow http2flow // conn-wide inbound flow control
tlsState *tls.ConnectionState // shared by all handlers, like net/http tlsState *tls.ConnectionState // shared by all handlers, like net/http
@ -4001,14 +4061,15 @@ type http2serverConn struct {
inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
needToSendGoAway bool // we need to schedule a GOAWAY frame write needToSendGoAway bool // we need to schedule a GOAWAY frame write
goAwayCode http2ErrCode goAwayCode http2ErrCode
shutdownTimerCh <-chan time.Time // nil until used shutdownTimer *time.Timer // nil until used
shutdownTimer *time.Timer // nil until used idleTimer *time.Timer // nil if unused
idleTimer *time.Timer // nil if unused
idleTimerCh <-chan time.Time // nil if unused
// Owned by the writeFrameAsync goroutine: // Owned by the writeFrameAsync goroutine:
headerWriteBuf bytes.Buffer headerWriteBuf bytes.Buffer
hpackEncoder *hpack.Encoder hpackEncoder *hpack.Encoder
// Used by startGracefulShutdown.
shutdownOnce sync.Once
} }
func (sc *http2serverConn) maxHeaderListSize() uint32 { func (sc *http2serverConn) maxHeaderListSize() uint32 {
@ -4295,19 +4356,15 @@ func (sc *http2serverConn) serve() {
sc.setConnState(StateIdle) sc.setConnState(StateIdle)
if sc.srv.IdleTimeout != 0 { if sc.srv.IdleTimeout != 0 {
sc.idleTimer = time.NewTimer(sc.srv.IdleTimeout) sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
defer sc.idleTimer.Stop() defer sc.idleTimer.Stop()
sc.idleTimerCh = sc.idleTimer.C
}
var gracefulShutdownCh <-chan struct{}
if sc.hs != nil {
gracefulShutdownCh = http2h1ServerShutdownChan(sc.hs)
} }
go sc.readFrames() go sc.readFrames()
settingsTimer := time.NewTimer(http2firstSettingsTimeout) settingsTimer := time.AfterFunc(http2firstSettingsTimeout, sc.onSettingsTimer)
defer settingsTimer.Stop()
loopNum := 0 loopNum := 0
for { for {
loopNum++ loopNum++
@ -4318,8 +4375,6 @@ func (sc *http2serverConn) serve() {
break break
} }
sc.writeFrame(wr) sc.writeFrame(wr)
case spr := <-sc.wantStartPushCh:
sc.startPush(spr)
case res := <-sc.wroteFrameCh: case res := <-sc.wroteFrameCh:
sc.wroteFrame(res) sc.wroteFrame(res)
case res := <-sc.readFrameCh: case res := <-sc.readFrameCh:
@ -4327,26 +4382,37 @@ func (sc *http2serverConn) serve() {
return return
} }
res.readMore() res.readMore()
if settingsTimer.C != nil { if settingsTimer != nil {
settingsTimer.Stop() settingsTimer.Stop()
settingsTimer.C = nil settingsTimer = nil
} }
case m := <-sc.bodyReadCh: case m := <-sc.bodyReadCh:
sc.noteBodyRead(m.st, m.n) sc.noteBodyRead(m.st, m.n)
case <-settingsTimer.C: case msg := <-sc.serveMsgCh:
sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr()) switch v := msg.(type) {
return case func(int):
case <-gracefulShutdownCh: v(loopNum)
gracefulShutdownCh = nil case *http2serverMessage:
sc.startGracefulShutdown() switch v {
case <-sc.shutdownTimerCh: case http2settingsTimerMsg:
sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr()) sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
return return
case <-sc.idleTimerCh: case http2idleTimerMsg:
sc.vlogf("connection is idle") sc.vlogf("connection is idle")
sc.goAway(http2ErrCodeNo) sc.goAway(http2ErrCodeNo)
case fn := <-sc.testHookCh: case http2shutdownTimerMsg:
fn(loopNum) sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
return
case http2gracefulShutdownMsg:
sc.startGracefulShutdownInternal()
default:
panic("unknown timer")
}
case *http2startPushRequest:
sc.startPush(v)
default:
panic(fmt.Sprintf("unexpected type %T", v))
}
} }
if sc.inGoAway && sc.curOpenStreams() == 0 && !sc.needToSendGoAway && !sc.writingFrame { if sc.inGoAway && sc.curOpenStreams() == 0 && !sc.needToSendGoAway && !sc.writingFrame {
@ -4355,6 +4421,38 @@ func (sc *http2serverConn) serve() {
} }
} }
func (sc *http2serverConn) awaitGracefulShutdown(sharedCh <-chan struct{}, privateCh chan struct{}) {
select {
case <-sc.doneServing:
case <-sharedCh:
close(privateCh)
}
}
type http2serverMessage int
// Message values sent to serveMsgCh.
var (
http2settingsTimerMsg = new(http2serverMessage)
http2idleTimerMsg = new(http2serverMessage)
http2shutdownTimerMsg = new(http2serverMessage)
http2gracefulShutdownMsg = new(http2serverMessage)
)
func (sc *http2serverConn) onSettingsTimer() { sc.sendServeMsg(http2settingsTimerMsg) }
func (sc *http2serverConn) onIdleTimer() { sc.sendServeMsg(http2idleTimerMsg) }
func (sc *http2serverConn) onShutdownTimer() { sc.sendServeMsg(http2shutdownTimerMsg) }
func (sc *http2serverConn) sendServeMsg(msg interface{}) {
sc.serveG.checkNotOn()
select {
case sc.serveMsgCh <- msg:
case <-sc.doneServing:
}
}
// readPreface reads the ClientPreface greeting from the peer // readPreface reads the ClientPreface greeting from the peer
// or returns an error on timeout or an invalid greeting. // or returns an error on timeout or an invalid greeting.
func (sc *http2serverConn) readPreface() error { func (sc *http2serverConn) readPreface() error {
@ -4630,10 +4728,19 @@ func (sc *http2serverConn) scheduleFrameWrite() {
sc.inFrameScheduleLoop = false sc.inFrameScheduleLoop = false
} }
// startGracefulShutdown sends a GOAWAY with ErrCodeNo to tell the // startGracefulShutdown gracefully shuts down a connection. This
// client we're gracefully shutting down. The connection isn't closed // sends GOAWAY with ErrCodeNo to tell the client we're gracefully
// until all current streams are done. // shutting down. The connection isn't closed until all current
// streams are done.
//
// startGracefulShutdown returns immediately; it does not wait until
// the connection has shut down.
func (sc *http2serverConn) startGracefulShutdown() { func (sc *http2serverConn) startGracefulShutdown() {
sc.serveG.checkNotOn()
sc.shutdownOnce.Do(func() { sc.sendServeMsg(http2gracefulShutdownMsg) })
}
func (sc *http2serverConn) startGracefulShutdownInternal() {
sc.goAwayIn(http2ErrCodeNo, 0) sc.goAwayIn(http2ErrCodeNo, 0)
} }
@ -4665,8 +4772,7 @@ func (sc *http2serverConn) goAwayIn(code http2ErrCode, forceCloseIn time.Duratio
func (sc *http2serverConn) shutDownIn(d time.Duration) { func (sc *http2serverConn) shutDownIn(d time.Duration) {
sc.serveG.check() sc.serveG.check()
sc.shutdownTimer = time.NewTimer(d) sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer)
sc.shutdownTimerCh = sc.shutdownTimer.C
} }
func (sc *http2serverConn) resetStream(se http2StreamError) { func (sc *http2serverConn) resetStream(se http2StreamError) {
@ -4839,7 +4945,7 @@ func (sc *http2serverConn) closeStream(st *http2stream, err error) {
sc.idleTimer.Reset(sc.srv.IdleTimeout) sc.idleTimer.Reset(sc.srv.IdleTimeout)
} }
if http2h1ServerKeepAlivesDisabled(sc.hs) { if http2h1ServerKeepAlivesDisabled(sc.hs) {
sc.startGracefulShutdown() sc.startGracefulShutdownInternal()
} }
} }
if p := st.body; p != nil { if p := st.body; p != nil {
@ -4988,7 +5094,7 @@ func (sc *http2serverConn) processGoAway(f *http2GoAwayFrame) error {
} else { } else {
sc.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", f) sc.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", f)
} }
sc.startGracefulShutdown() sc.startGracefulShutdownInternal()
sc.pushEnabled = false sc.pushEnabled = false
return nil return nil
@ -5918,7 +6024,7 @@ func (w *http2responseWriter) push(target string, opts http2pushOptions) error {
return fmt.Errorf("method %q must be GET or HEAD", opts.Method) return fmt.Errorf("method %q must be GET or HEAD", opts.Method)
} }
msg := http2startPushRequest{ msg := &http2startPushRequest{
parent: st, parent: st,
method: opts.Method, method: opts.Method,
url: u, url: u,
@ -5931,7 +6037,7 @@ func (w *http2responseWriter) push(target string, opts http2pushOptions) error {
return http2errClientDisconnected return http2errClientDisconnected
case <-st.cw: case <-st.cw:
return http2errStreamClosed return http2errStreamClosed
case sc.wantStartPushCh <- msg: case sc.serveMsgCh <- msg:
} }
select { select {
@ -5953,7 +6059,7 @@ type http2startPushRequest struct {
done chan error done chan error
} }
func (sc *http2serverConn) startPush(msg http2startPushRequest) { func (sc *http2serverConn) startPush(msg *http2startPushRequest) {
sc.serveG.check() sc.serveG.check()
if msg.parent.state != http2stateOpen && msg.parent.state != http2stateHalfClosedRemote { if msg.parent.state != http2stateOpen && msg.parent.state != http2stateHalfClosedRemote {
@ -5979,7 +6085,7 @@ func (sc *http2serverConn) startPush(msg http2startPushRequest) {
} }
if sc.maxPushPromiseID+2 >= 1<<31 { if sc.maxPushPromiseID+2 >= 1<<31 {
sc.startGracefulShutdown() sc.startGracefulShutdownInternal()
return 0, http2ErrPushLimitReached return 0, http2ErrPushLimitReached
} }
sc.maxPushPromiseID += 2 sc.maxPushPromiseID += 2
@ -6099,31 +6205,6 @@ var http2badTrailer = map[string]bool{
"Www-Authenticate": true, "Www-Authenticate": true,
} }
// h1ServerShutdownChan returns a channel that will be closed when the
// provided *http.Server wants to shut down.
//
// This is a somewhat hacky way to get at http1 innards. It works
// when the http2 code is bundled into the net/http package in the
// standard library. The alternatives ended up making the cmd/go tool
// depend on http Servers. This is the lightest option for now.
// This is tested via the TestServeShutdown* tests in net/http.
func http2h1ServerShutdownChan(hs *Server) <-chan struct{} {
if fn := http2testh1ServerShutdownChan; fn != nil {
return fn(hs)
}
var x interface{} = hs
type I interface {
getDoneChan() <-chan struct{}
}
if hs, ok := x.(I); ok {
return hs.getDoneChan()
}
return nil
}
// optional test hook for h1ServerShutdownChan.
var http2testh1ServerShutdownChan func(hs *Server) <-chan struct{}
// h1ServerKeepAlivesDisabled reports whether hs has its keep-alives // h1ServerKeepAlivesDisabled reports whether hs has its keep-alives
// disabled. See comments on h1ServerShutdownChan above for why // disabled. See comments on h1ServerShutdownChan above for why
// the code is written this way. // the code is written this way.
@ -7681,6 +7762,7 @@ func (b http2transportResponseBody) Close() error {
cc.wmu.Lock() cc.wmu.Lock()
if !serverSentStreamEnd { if !serverSentStreamEnd {
cc.fr.WriteRSTStream(cs.ID, http2ErrCodeCancel) cc.fr.WriteRSTStream(cs.ID, http2ErrCodeCancel)
cs.didReset = true
} }
if unread > 0 { if unread > 0 {
@ -7723,11 +7805,6 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error {
return nil return nil
} }
if f.Length > 0 { if f.Length > 0 {
if len(data) > 0 && cs.bufPipe.b == nil {
cc.logf("http2: Transport received DATA frame for closed stream; closing connection")
return http2ConnectionError(http2ErrCodeProtocol)
}
cc.mu.Lock() cc.mu.Lock()
if cs.inflow.available() >= int32(f.Length) { if cs.inflow.available() >= int32(f.Length) {