From 10316757cec3c2744ea61088e0fc905cfeb28fb2 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Fri, 5 Aug 2016 16:42:31 +0000 Subject: [PATCH] net/http: update bundled http2 for flow control window adjustment fix Updates bundled http2 to x/net/http2 git rev 075e191 for: http2: adjust flow control on open streams when processing SETTINGS https://golang.org/cl/25508 Fixes #16612 Change-Id: Ib0513201bff44ab747a574ae6894479325c105d2 Reviewed-on: https://go-review.googlesource.com/25543 Run-TryBot: Chris Broadfoot Reviewed-by: Ian Lance Taylor Reviewed-by: Chris Broadfoot TryBot-Result: Gobot Gobot --- src/net/http/h2_bundle.go | 112 ++++++++++++++++++++++++++------------ 1 file changed, 78 insertions(+), 34 deletions(-) diff --git a/src/net/http/h2_bundle.go b/src/net/http/h2_bundle.go index cd66c0960e7..ffe15f0605e 100644 --- a/src/net/http/h2_bundle.go +++ b/src/net/http/h2_bundle.go @@ -28,6 +28,7 @@ import ( "io" "io/ioutil" "log" + "math" "net" "net/http/httptrace" "net/textproto" @@ -403,9 +404,17 @@ func (e http2ConnectionError) Error() string { type http2StreamError struct { StreamID uint32 Code http2ErrCode + Cause error // optional additional detail +} + +func http2streamError(id uint32, code http2ErrCode) http2StreamError { + return http2StreamError{StreamID: id, Code: code} } func (e http2StreamError) Error() string { + if e.Cause != nil { + return fmt.Sprintf("stream error: stream ID %d; %v; %v", e.StreamID, e.Code, e.Cause) + } return fmt.Sprintf("stream error: stream ID %d; %v", e.StreamID, e.Code) } @@ -1366,7 +1375,7 @@ func http2parseWindowUpdateFrame(fh http2FrameHeader, p []byte) (http2Frame, err if fh.StreamID == 0 { return nil, http2ConnectionError(http2ErrCodeProtocol) } - return nil, http2StreamError{fh.StreamID, http2ErrCodeProtocol} + return nil, http2streamError(fh.StreamID, http2ErrCodeProtocol) } return &http2WindowUpdateFrame{ http2FrameHeader: fh, @@ -1444,7 +1453,7 @@ func http2parseHeadersFrame(fh http2FrameHeader, p []byte) (_ http2Frame, err er } } if len(p)-int(padLength) <= 0 { - return nil, http2StreamError{fh.StreamID, http2ErrCodeProtocol} + return nil, http2streamError(fh.StreamID, http2ErrCodeProtocol) } hf.headerFragBuf = p[:len(p)-int(padLength)] return hf, nil @@ -1911,6 +1920,9 @@ func (fr *http2Framer) readMetaFrame(hf *http2HeadersFrame) (*http2MetaHeadersFr hdec.SetEmitEnabled(true) hdec.SetMaxStringLength(fr.maxHeaderStringLen()) hdec.SetEmitFunc(func(hf hpack.HeaderField) { + if http2VerboseLogs && http2logFrameReads { + log.Printf("http2: decoded hpack field %+v", hf) + } if !httplex.ValidHeaderFieldValue(hf.Value) { invalid = http2headerFieldValueError(hf.Value) } @@ -1969,11 +1981,17 @@ func (fr *http2Framer) readMetaFrame(hf *http2HeadersFrame) (*http2MetaHeadersFr } if invalid != nil { fr.errDetail = invalid - return nil, http2StreamError{mh.StreamID, http2ErrCodeProtocol} + if http2VerboseLogs { + log.Printf("http2: invalid header: %v", invalid) + } + return nil, http2StreamError{mh.StreamID, http2ErrCodeProtocol, invalid} } if err := mh.checkPseudos(); err != nil { fr.errDetail = err - return nil, http2StreamError{mh.StreamID, http2ErrCodeProtocol} + if http2VerboseLogs { + log.Printf("http2: invalid pseudo headers: %v", err) + } + return nil, http2StreamError{mh.StreamID, http2ErrCodeProtocol, err} } return mh, nil } @@ -3604,7 +3622,7 @@ func (sc *http2serverConn) wroteFrame(res http2frameWriteResult) { case http2stateOpen: st.state = http2stateHalfClosedLocal - errCancel := http2StreamError{st.id, http2ErrCodeCancel} + errCancel := http2streamError(st.id, http2ErrCodeCancel) sc.resetStream(errCancel) case http2stateHalfClosedRemote: sc.closeStream(st, http2errHandlerComplete) @@ -3797,7 +3815,7 @@ func (sc *http2serverConn) processWindowUpdate(f *http2WindowUpdateFrame) error return nil } if !st.flow.add(int32(f.Increment)) { - return http2StreamError{f.StreamID, http2ErrCodeFlowControl} + return http2streamError(f.StreamID, http2ErrCodeFlowControl) } default: if !sc.flow.add(int32(f.Increment)) { @@ -3819,7 +3837,7 @@ func (sc *http2serverConn) processResetStream(f *http2RSTStreamFrame) error { if st != nil { st.gotReset = true st.cancelCtx() - sc.closeStream(st, http2StreamError{f.StreamID, f.ErrCode}) + sc.closeStream(st, http2streamError(f.StreamID, f.ErrCode)) } return nil } @@ -3922,13 +3940,13 @@ func (sc *http2serverConn) processData(f *http2DataFrame) error { if !ok || st.state != http2stateOpen || st.gotTrailerHeader { if sc.inflow.available() < int32(f.Length) { - return http2StreamError{id, http2ErrCodeFlowControl} + return http2streamError(id, http2ErrCodeFlowControl) } sc.inflow.take(int32(f.Length)) sc.sendWindowUpdate(nil, int(f.Length)) - return http2StreamError{id, http2ErrCodeStreamClosed} + return http2streamError(id, http2ErrCodeStreamClosed) } if st.body == nil { panic("internal error: should have a body in this state") @@ -3936,19 +3954,19 @@ func (sc *http2serverConn) processData(f *http2DataFrame) error { if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes { st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes)) - return http2StreamError{id, http2ErrCodeStreamClosed} + return http2streamError(id, http2ErrCodeStreamClosed) } if f.Length > 0 { if st.inflow.available() < int32(f.Length) { - return http2StreamError{id, http2ErrCodeFlowControl} + return http2streamError(id, http2ErrCodeFlowControl) } st.inflow.take(int32(f.Length)) if len(data) > 0 { wrote, err := st.body.Write(data) if err != nil { - return http2StreamError{id, http2ErrCodeStreamClosed} + return http2streamError(id, http2ErrCodeStreamClosed) } if wrote != len(data) { panic("internal error: bad Writer") @@ -4046,10 +4064,10 @@ func (sc *http2serverConn) processHeaders(f *http2MetaHeadersFrame) error { if sc.unackedSettings == 0 { - return http2StreamError{st.id, http2ErrCodeProtocol} + return http2streamError(st.id, http2ErrCodeProtocol) } - return http2StreamError{st.id, http2ErrCodeRefusedStream} + return http2streamError(st.id, http2ErrCodeRefusedStream) } rw, req, err := sc.newWriterAndRequest(st, f) @@ -4083,18 +4101,18 @@ func (st *http2stream) processTrailerHeaders(f *http2MetaHeadersFrame) error { } st.gotTrailerHeader = true if !f.StreamEnded() { - return http2StreamError{st.id, http2ErrCodeProtocol} + return http2streamError(st.id, http2ErrCodeProtocol) } if len(f.PseudoFields()) > 0 { - return http2StreamError{st.id, http2ErrCodeProtocol} + return http2streamError(st.id, http2ErrCodeProtocol) } if st.trailer != nil { for _, hf := range f.RegularFields() { key := sc.canonicalHeader(hf.Name) if !http2ValidTrailerHeader(key) { - return http2StreamError{st.id, http2ErrCodeProtocol} + return http2streamError(st.id, http2ErrCodeProtocol) } st.trailer[key] = append(st.trailer[key], hf.Value) } @@ -4148,18 +4166,18 @@ func (sc *http2serverConn) newWriterAndRequest(st *http2stream, f *http2MetaHead isConnect := method == "CONNECT" if isConnect { if path != "" || scheme != "" || authority == "" { - return nil, nil, http2StreamError{f.StreamID, http2ErrCodeProtocol} + return nil, nil, http2streamError(f.StreamID, http2ErrCodeProtocol) } } else if method == "" || path == "" || (scheme != "https" && scheme != "http") { - return nil, nil, http2StreamError{f.StreamID, http2ErrCodeProtocol} + return nil, nil, http2streamError(f.StreamID, http2ErrCodeProtocol) } bodyOpen := !f.StreamEnded() if method == "HEAD" && bodyOpen { - return nil, nil, http2StreamError{f.StreamID, http2ErrCodeProtocol} + return nil, nil, http2streamError(f.StreamID, http2ErrCodeProtocol) } var tlsState *tls.ConnectionState // nil if not scheme https @@ -4216,7 +4234,7 @@ func (sc *http2serverConn) newWriterAndRequest(st *http2stream, f *http2MetaHead var err error url_, err = url.ParseRequestURI(path) if err != nil { - return nil, nil, http2StreamError{f.StreamID, http2ErrCodeProtocol} + return nil, nil, http2streamError(f.StreamID, http2ErrCodeProtocol) } requestURI = path } @@ -4993,14 +5011,14 @@ type http2ClientConn struct { br *bufio.Reader fr *http2Framer lastActive time.Time - - // Settings from peer: + // Settings from peer: (also guarded by mu) maxFrameSize uint32 maxConcurrentStreams uint32 initialWindowSize uint32 - hbuf bytes.Buffer // HPACK encoder writes into this - henc *hpack.Encoder - freeBuf [][]byte + + hbuf bytes.Buffer // HPACK encoder writes into this + henc *hpack.Encoder + freeBuf [][]byte wmu sync.Mutex // held while writing; acquire AFTER mu if holding both werr error // first write error that has occurred @@ -5244,10 +5262,6 @@ func (t *http2Transport) NewClientConn(c net.Conn) (*http2ClientConn, error) { } func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2ClientConn, error) { - if http2VerboseLogs { - t.vlogf("http2: Transport creating client conn to %v", c.RemoteAddr()) - } - cc := &http2ClientConn{ t: t, tconn: c, @@ -5260,6 +5274,10 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2Client singleUse: singleUse, wantSettingsAck: true, } + if http2VerboseLogs { + t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr()) + } + cc.cond = sync.NewCond(&cc.mu) cc.flow.add(int32(http2initialWindowSize)) @@ -5324,7 +5342,7 @@ func (cc *http2ClientConn) canTakeNewRequestLocked() bool { } return cc.goAway == nil && !cc.closed && int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) && - cc.nextStreamID < 2147483647 + cc.nextStreamID < math.MaxInt32 } func (cc *http2ClientConn) closeIfIdle() { @@ -5334,9 +5352,13 @@ func (cc *http2ClientConn) closeIfIdle() { return } cc.closed = true + nextID := cc.nextStreamID cc.mu.Unlock() + if http2VerboseLogs { + cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2) + } cc.tconn.Close() } @@ -5986,11 +6008,15 @@ func (rl *http2clientConnReadLoop) run() error { for { f, err := cc.fr.ReadFrame() if err != nil { - cc.vlogf("Transport readFrame error: (%T) %v", err, err) + cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err) } if se, ok := err.(http2StreamError); ok { if cs := cc.streamByID(se.StreamID, true); cs != nil { - rl.endStreamError(cs, cc.fr.errDetail) + cs.cc.writeStreamReset(cs.ID, se.Code, err) + if se.Cause == nil { + se.Cause = cc.fr.errDetail + } + rl.endStreamError(cs, se) } continue } else if err != nil { @@ -6034,6 +6060,9 @@ func (rl *http2clientConnReadLoop) run() error { cc.logf("Transport: unhandled response frame type %T", f) } if err != nil { + if http2VerboseLogs { + cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, http2summarizeFrame(f), err) + } return err } if rl.closeWhenIdle && gotReply && maybeIdle && len(rl.activeRes) == 0 { @@ -6381,6 +6410,11 @@ func (rl *http2clientConnReadLoop) endStreamError(cs *http2clientStream, err err if http2isConnectionCloseRequest(cs.req) { rl.closeWhenIdle = true } + + select { + case cs.resc <- http2resAndError{err: err}: + default: + } } func (cs *http2clientStream) copyTrailers() { @@ -6425,6 +6459,16 @@ func (rl *http2clientConnReadLoop) processSettings(f *http2SettingsFrame) error cc.maxConcurrentStreams = s.Val case http2SettingInitialWindowSize: + if s.Val > math.MaxInt32 { + return http2ConnectionError(http2ErrCodeFlowControl) + } + + delta := int32(s.Val) - int32(cc.initialWindowSize) + for _, cs := range cc.streams { + cs.flow.add(delta) + } + cc.cond.Broadcast() + cc.initialWindowSize = s.Val default: @@ -6475,7 +6519,7 @@ func (rl *http2clientConnReadLoop) processResetStream(f *http2RSTStreamFrame) er case <-cs.peerReset: default: - err := http2StreamError{cs.ID, f.ErrCode} + err := http2streamError(cs.ID, f.ErrCode) cs.resetErr = err close(cs.peerReset) cs.bufPipe.CloseWithError(err)