diff --git a/src/net/http/h2_bundle.go b/src/net/http/h2_bundle.go index 4536b2ff5d..36ebeeaf34 100644 --- a/src/net/http/h2_bundle.go +++ b/src/net/http/h2_bundle.go @@ -277,7 +277,7 @@ func http2filterOutClientConn(in []*http2ClientConn, exclude *http2ClientConn) [ } // noDialClientConnPool is an implementation of http2.ClientConnPool -// which never dials. We let the HTTP/1.1 client dial and use its TLS +// which never dials. We let the HTTP/1.1 client dial and use its TLS // connection instead. type http2noDialClientConnPool struct{ *http2clientConnPool } @@ -349,6 +349,139 @@ func (rt http2noDialH2RoundTripper) RoundTrip(req *Request) (*Response, error) { return res, err } +// Buffer chunks are allocated from a pool to reduce pressure on GC. +// The maximum wasted space per dataBuffer is 2x the largest size class, +// which happens when the dataBuffer has multiple chunks and there is +// one unread byte in both the first and last chunks. We use a few size +// classes to minimize overheads for servers that typically receive very +// small request bodies. +// +// TODO: Benchmark to determine if the pools are necessary. The GC may have +// improved enough that we can instead allocate chunks like this: +// make([]byte, max(16<<10, expectedBytesRemaining)) +var ( + http2dataChunkSizeClasses = []int{ + 1 << 10, + 2 << 10, + 4 << 10, + 8 << 10, + 16 << 10, + } + http2dataChunkPools = [...]sync.Pool{ + {New: func() interface{} { return make([]byte, 1<<10) }}, + {New: func() interface{} { return make([]byte, 2<<10) }}, + {New: func() interface{} { return make([]byte, 4<<10) }}, + {New: func() interface{} { return make([]byte, 8<<10) }}, + {New: func() interface{} { return make([]byte, 16<<10) }}, + } +) + +func http2getDataBufferChunk(size int64) []byte { + i := 0 + for ; i < len(http2dataChunkSizeClasses)-1; i++ { + if size <= int64(http2dataChunkSizeClasses[i]) { + break + } + } + return http2dataChunkPools[i].Get().([]byte) +} + +func http2putDataBufferChunk(p []byte) { + for i, n := range http2dataChunkSizeClasses { + if len(p) == n { + http2dataChunkPools[i].Put(p) + return + } + } + panic(fmt.Sprintf("unexpected buffer len=%v", len(p))) +} + +// dataBuffer is an io.ReadWriter backed by a list of data chunks. +// Each dataBuffer is used to read DATA frames on a single stream. +// The buffer is divided into chunks so the server can limit the +// total memory used by a single connection without limiting the +// request body size on any single stream. +type http2dataBuffer struct { + chunks [][]byte + r int // next byte to read is chunks[0][r] + w int // next byte to write is chunks[len(chunks)-1][w] + size int // total buffered bytes + expected int64 // we expect at least this many bytes in future Write calls (ignored if <= 0) +} + +var http2errReadEmpty = errors.New("read from empty dataBuffer") + +// Read copies bytes from the buffer into p. +// It is an error to read when no data is available. +func (b *http2dataBuffer) Read(p []byte) (int, error) { + if b.size == 0 { + return 0, http2errReadEmpty + } + var ntotal int + for len(p) > 0 && b.size > 0 { + readFrom := b.bytesFromFirstChunk() + n := copy(p, readFrom) + p = p[n:] + ntotal += n + b.r += n + b.size -= n + + if b.r == len(b.chunks[0]) { + http2putDataBufferChunk(b.chunks[0]) + end := len(b.chunks) - 1 + copy(b.chunks[:end], b.chunks[1:]) + b.chunks[end] = nil + b.chunks = b.chunks[:end] + b.r = 0 + } + } + return ntotal, nil +} + +func (b *http2dataBuffer) bytesFromFirstChunk() []byte { + if len(b.chunks) == 1 { + return b.chunks[0][b.r:b.w] + } + return b.chunks[0][b.r:] +} + +// Len returns the number of bytes of the unread portion of the buffer. +func (b *http2dataBuffer) Len() int { + return b.size +} + +// Write appends p to the buffer. +func (b *http2dataBuffer) Write(p []byte) (int, error) { + ntotal := len(p) + for len(p) > 0 { + + want := int64(len(p)) + if b.expected > want { + want = b.expected + } + chunk := b.lastChunkOrAlloc(want) + n := copy(chunk[b.w:], p) + p = p[n:] + b.w += n + b.size += n + b.expected -= int64(n) + } + return ntotal, nil +} + +func (b *http2dataBuffer) lastChunkOrAlloc(want int64) []byte { + if len(b.chunks) != 0 { + last := b.chunks[len(b.chunks)-1] + if b.w < len(last) { + return last + } + } + chunk := http2getDataBufferChunk(want) + b.chunks = append(b.chunks, chunk) + b.w = 0 + return chunk +} + // An ErrCode is an unsigned 32-bit error code as defined in the HTTP/2 spec. type http2ErrCode uint32 @@ -469,56 +602,6 @@ var ( http2errPseudoAfterRegular = errors.New("pseudo header field after regular") ) -// fixedBuffer is an io.ReadWriter backed by a fixed size buffer. -// It never allocates, but moves old data as new data is written. -type http2fixedBuffer struct { - buf []byte - r, w int -} - -var ( - http2errReadEmpty = errors.New("read from empty fixedBuffer") - http2errWriteFull = errors.New("write on full fixedBuffer") -) - -// Read copies bytes from the buffer into p. -// It is an error to read when no data is available. -func (b *http2fixedBuffer) Read(p []byte) (n int, err error) { - if b.r == b.w { - return 0, http2errReadEmpty - } - n = copy(p, b.buf[b.r:b.w]) - b.r += n - if b.r == b.w { - b.r = 0 - b.w = 0 - } - return n, nil -} - -// Len returns the number of bytes of the unread portion of the buffer. -func (b *http2fixedBuffer) Len() int { - return b.w - b.r -} - -// Write copies bytes from p into the buffer. -// It is an error to write more data than the buffer can hold. -func (b *http2fixedBuffer) Write(p []byte) (n int, err error) { - - if b.r > 0 && len(p) > len(b.buf)-b.w { - copy(b.buf, b.buf[b.r:b.w]) - b.w -= b.r - b.r = 0 - } - - n = copy(b.buf[b.w:], p) - b.w += n - if n < len(p) { - err = http2errWriteFull - } - return n, err -} - // flow is the flow control window's size. type http2flow struct { // n is the number of DATA bytes we're allowed to send. @@ -666,7 +749,7 @@ var http2flagName = map[http2FrameType]map[http2Flags]string{ // a frameParser parses a frame given its FrameHeader and payload // bytes. The length of payload will always equal fh.Length (which // might be 0). -type http2frameParser func(fh http2FrameHeader, payload []byte) (http2Frame, error) +type http2frameParser func(fc *http2frameCache, fh http2FrameHeader, payload []byte) (http2Frame, error) var http2frameParsers = map[http2FrameType]http2frameParser{ http2FrameData: http2parseDataFrame, @@ -861,6 +944,8 @@ type http2Framer struct { debugFramerBuf *bytes.Buffer debugReadLoggerf func(string, ...interface{}) debugWriteLoggerf func(string, ...interface{}) + + frameCache *http2frameCache // nil if frames aren't reused (default) } func (fr *http2Framer) maxHeaderListSize() uint32 { @@ -937,6 +1022,27 @@ const ( http2maxFrameSize = 1<<24 - 1 ) +// SetReuseFrames allows the Framer to reuse Frames. +// If called on a Framer, Frames returned by calls to ReadFrame are only +// valid until the next call to ReadFrame. +func (fr *http2Framer) SetReuseFrames() { + if fr.frameCache != nil { + return + } + fr.frameCache = &http2frameCache{} +} + +type http2frameCache struct { + dataFrame http2DataFrame +} + +func (fc *http2frameCache) getDataFrame() *http2DataFrame { + if fc == nil { + return &http2DataFrame{} + } + return &fc.dataFrame +} + // NewFramer returns a Framer that writes frames to w and reads them from r. func http2NewFramer(w io.Writer, r io.Reader) *http2Framer { fr := &http2Framer{ @@ -1016,7 +1122,7 @@ func (fr *http2Framer) ReadFrame() (http2Frame, error) { if _, err := io.ReadFull(fr.r, payload); err != nil { return nil, err } - f, err := http2typeFrameParser(fh.Type)(fh, payload) + f, err := http2typeFrameParser(fh.Type)(fr.frameCache, fh, payload) if err != nil { if ce, ok := err.(http2connError); ok { return nil, fr.connError(ce.Code, ce.Reason) @@ -1104,14 +1210,14 @@ func (f *http2DataFrame) Data() []byte { return f.data } -func http2parseDataFrame(fh http2FrameHeader, payload []byte) (http2Frame, error) { +func http2parseDataFrame(fc *http2frameCache, fh http2FrameHeader, payload []byte) (http2Frame, error) { if fh.StreamID == 0 { return nil, http2connError{http2ErrCodeProtocol, "DATA frame with stream ID 0"} } - f := &http2DataFrame{ - http2FrameHeader: fh, - } + f := fc.getDataFrame() + f.http2FrameHeader = fh + var padSize byte if fh.Flags.Has(http2FlagDataPadded) { var err error @@ -1132,6 +1238,7 @@ var ( http2errStreamID = errors.New("invalid stream ID") http2errDepStreamID = errors.New("invalid dependent stream ID") http2errPadLength = errors.New("pad length too large") + http2errPadBytes = errors.New("padding bytes must all be zeros unless AllowIllegalWrites is enabled") ) func http2validStreamIDOrZero(streamID uint32) bool { @@ -1155,6 +1262,7 @@ func (f *http2Framer) WriteData(streamID uint32, endStream bool, data []byte) er // // If pad is nil, the padding bit is not sent. // The length of pad must not exceed 255 bytes. +// The bytes of pad must all be zero, unless f.AllowIllegalWrites is set. // // It will perform exactly one Write to the underlying Writer. // It is the caller's responsibility not to violate the maximum frame size @@ -1163,8 +1271,18 @@ func (f *http2Framer) WriteDataPadded(streamID uint32, endStream bool, data, pad if !http2validStreamID(streamID) && !f.AllowIllegalWrites { return http2errStreamID } - if len(pad) > 255 { - return http2errPadLength + if len(pad) > 0 { + if len(pad) > 255 { + return http2errPadLength + } + if !f.AllowIllegalWrites { + for _, b := range pad { + if b != 0 { + + return http2errPadBytes + } + } + } } var flags http2Flags if endStream { @@ -1192,7 +1310,7 @@ type http2SettingsFrame struct { p []byte } -func http2parseSettingsFrame(fh http2FrameHeader, p []byte) (http2Frame, error) { +func http2parseSettingsFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (http2Frame, error) { if fh.Flags.Has(http2FlagSettingsAck) && fh.Length > 0 { return nil, http2ConnectionError(http2ErrCodeFrameSize) @@ -1281,7 +1399,7 @@ type http2PingFrame struct { func (f *http2PingFrame) IsAck() bool { return f.Flags.Has(http2FlagPingAck) } -func http2parsePingFrame(fh http2FrameHeader, payload []byte) (http2Frame, error) { +func http2parsePingFrame(_ *http2frameCache, fh http2FrameHeader, payload []byte) (http2Frame, error) { if len(payload) != 8 { return nil, http2ConnectionError(http2ErrCodeFrameSize) } @@ -1321,7 +1439,7 @@ func (f *http2GoAwayFrame) DebugData() []byte { return f.debugData } -func http2parseGoAwayFrame(fh http2FrameHeader, p []byte) (http2Frame, error) { +func http2parseGoAwayFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (http2Frame, error) { if fh.StreamID != 0 { return nil, http2ConnectionError(http2ErrCodeProtocol) } @@ -1361,7 +1479,7 @@ func (f *http2UnknownFrame) Payload() []byte { return f.p } -func http2parseUnknownFrame(fh http2FrameHeader, p []byte) (http2Frame, error) { +func http2parseUnknownFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (http2Frame, error) { return &http2UnknownFrame{fh, p}, nil } @@ -1372,7 +1490,7 @@ type http2WindowUpdateFrame struct { Increment uint32 // never read with high bit set } -func http2parseWindowUpdateFrame(fh http2FrameHeader, p []byte) (http2Frame, error) { +func http2parseWindowUpdateFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (http2Frame, error) { if len(p) != 4 { return nil, http2ConnectionError(http2ErrCodeFrameSize) } @@ -1432,7 +1550,7 @@ func (f *http2HeadersFrame) HasPriority() bool { return f.http2FrameHeader.Flags.Has(http2FlagHeadersPriority) } -func http2parseHeadersFrame(fh http2FrameHeader, p []byte) (_ http2Frame, err error) { +func http2parseHeadersFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (_ http2Frame, err error) { hf := &http2HeadersFrame{ http2FrameHeader: fh, } @@ -1556,7 +1674,7 @@ type http2PriorityParam struct { Exclusive bool // Weight is the stream's zero-indexed weight. It should be - // set together with StreamDep, or neither should be set. Per + // set together with StreamDep, or neither should be set. Per // the spec, "Add one to the value to obtain a weight between // 1 and 256." Weight uint8 @@ -1566,7 +1684,7 @@ func (p http2PriorityParam) IsZero() bool { return p == http2PriorityParam{} } -func http2parsePriorityFrame(fh http2FrameHeader, payload []byte) (http2Frame, error) { +func http2parsePriorityFrame(_ *http2frameCache, fh http2FrameHeader, payload []byte) (http2Frame, error) { if fh.StreamID == 0 { return nil, http2connError{http2ErrCodeProtocol, "PRIORITY frame with stream ID 0"} } @@ -1613,7 +1731,7 @@ type http2RSTStreamFrame struct { ErrCode http2ErrCode } -func http2parseRSTStreamFrame(fh http2FrameHeader, p []byte) (http2Frame, error) { +func http2parseRSTStreamFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (http2Frame, error) { if len(p) != 4 { return nil, http2ConnectionError(http2ErrCodeFrameSize) } @@ -1643,7 +1761,7 @@ type http2ContinuationFrame struct { headerFragBuf []byte } -func http2parseContinuationFrame(fh http2FrameHeader, p []byte) (http2Frame, error) { +func http2parseContinuationFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (http2Frame, error) { if fh.StreamID == 0 { return nil, http2connError{http2ErrCodeProtocol, "CONTINUATION frame with stream ID 0"} } @@ -1693,7 +1811,7 @@ func (f *http2PushPromiseFrame) HeadersEnded() bool { return f.http2FrameHeader.Flags.Has(http2FlagPushPromiseEndHeaders) } -func http2parsePushPromise(fh http2FrameHeader, p []byte) (_ http2Frame, err error) { +func http2parsePushPromise(_ *http2frameCache, fh http2FrameHeader, p []byte) (_ http2Frame, err error) { pp := &http2PushPromiseFrame{ http2FrameHeader: fh, } @@ -2778,7 +2896,7 @@ func http2validPseudoPath(v string) bool { return (len(v) > 0 && v[0] == '/' && (len(v) == 1 || v[1] != '/')) || v == "*" } -// pipe is a goroutine-safe io.Reader/io.Writer pair. It's like +// pipe is a goroutine-safe io.Reader/io.Writer pair. It's like // io.Pipe except there are no PipeReader/PipeWriter halves, and the // underlying buffer is an interface. (io.Pipe is always unbuffered) type http2pipe struct { @@ -2980,11 +3098,38 @@ type http2Server struct { // activity for the purposes of IdleTimeout. IdleTimeout time.Duration + // MaxUploadBufferPerConnection is the size of the initial flow + // control window for each connections. The HTTP/2 spec does not + // allow this to be smaller than 65535 or larger than 2^32-1. + // If the value is outside this range, a default value will be + // used instead. + MaxUploadBufferPerConnection int32 + + // MaxUploadBufferPerStream is the size of the initial flow control + // window for each stream. The HTTP/2 spec does not allow this to + // be larger than 2^32-1. If the value is zero or larger than the + // maximum, a default value will be used instead. + MaxUploadBufferPerStream int32 + // NewWriteScheduler constructs a write scheduler for a connection. // If nil, a default scheduler is chosen. NewWriteScheduler func() http2WriteScheduler } +func (s *http2Server) initialConnRecvWindowSize() int32 { + if s.MaxUploadBufferPerConnection > http2initialWindowSize { + return s.MaxUploadBufferPerConnection + } + return 1 << 20 +} + +func (s *http2Server) initialStreamRecvWindowSize() int32 { + if s.MaxUploadBufferPerStream > 0 { + return s.MaxUploadBufferPerStream + } + return 1 << 20 +} + func (s *http2Server) maxReadFrameSize() uint32 { if v := s.MaxReadFrameSize; v >= http2minMaxFrameSize && v <= http2maxFrameSize { return v @@ -3118,27 +3263,27 @@ func (s *http2Server) ServeConn(c net.Conn, opts *http2ServeConnOpts) { defer cancel() sc := &http2serverConn{ - srv: s, - hs: opts.baseConfig(), - conn: c, - baseCtx: baseCtx, - remoteAddrStr: c.RemoteAddr().String(), - bw: http2newBufferedWriter(c), - handler: opts.handler(), - streams: make(map[uint32]*http2stream), - readFrameCh: make(chan http2readFrameResult), - wantWriteFrameCh: make(chan http2FrameWriteRequest, 8), - wantStartPushCh: make(chan http2startPushRequest, 8), - wroteFrameCh: make(chan http2frameWriteResult, 1), - bodyReadCh: make(chan http2bodyReadMsg), - doneServing: make(chan struct{}), - clientMaxStreams: math.MaxUint32, - advMaxStreams: s.maxConcurrentStreams(), - initialWindowSize: http2initialWindowSize, - maxFrameSize: http2initialMaxFrameSize, - headerTableSize: http2initialHeaderTableSize, - serveG: http2newGoroutineLock(), - pushEnabled: true, + srv: s, + hs: opts.baseConfig(), + conn: c, + baseCtx: baseCtx, + remoteAddrStr: c.RemoteAddr().String(), + bw: http2newBufferedWriter(c), + handler: opts.handler(), + streams: make(map[uint32]*http2stream), + readFrameCh: make(chan http2readFrameResult), + wantWriteFrameCh: make(chan http2FrameWriteRequest, 8), + wantStartPushCh: make(chan http2startPushRequest, 8), + wroteFrameCh: make(chan http2frameWriteResult, 1), + bodyReadCh: make(chan http2bodyReadMsg), + doneServing: make(chan struct{}), + clientMaxStreams: math.MaxUint32, + advMaxStreams: s.maxConcurrentStreams(), + initialStreamSendWindowSize: http2initialWindowSize, + maxFrameSize: http2initialMaxFrameSize, + headerTableSize: http2initialHeaderTableSize, + serveG: http2newGoroutineLock(), + pushEnabled: true, } if sc.hs.WriteTimeout != 0 { @@ -3218,34 +3363,34 @@ type http2serverConn struct { writeSched http2WriteScheduler // Everything following is owned by the serve loop; use serveG.check(): - serveG http2goroutineLock // used to verify funcs are on serve() - pushEnabled bool - sawFirstSettings bool // got the initial SETTINGS frame after the preface - needToSendSettingsAck bool - unackedSettings int // how many SETTINGS have we sent without ACKs? - clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit) - advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client - curClientStreams uint32 // number of open streams initiated by the client - curPushedStreams uint32 // number of open streams initiated by server push - maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests - maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes - streams map[uint32]*http2stream - initialWindowSize int32 - maxFrameSize int32 - headerTableSize uint32 - peerMaxHeaderListSize uint32 // zero means unknown (default) - canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case - writingFrame bool // started writing a frame (on serve goroutine or separate) - writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh - needsFrameFlush bool // last frame write wasn't a flush - inGoAway bool // we've started to or sent GOAWAY - inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop - needToSendGoAway bool // we need to schedule a GOAWAY frame write - goAwayCode http2ErrCode - shutdownTimerCh <-chan time.Time // nil until used - shutdownTimer *time.Timer // nil until used - idleTimer *time.Timer // nil if unused - idleTimerCh <-chan time.Time // nil if unused + serveG http2goroutineLock // used to verify funcs are on serve() + pushEnabled bool + sawFirstSettings bool // got the initial SETTINGS frame after the preface + needToSendSettingsAck bool + unackedSettings int // how many SETTINGS have we sent without ACKs? + clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit) + advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client + curClientStreams uint32 // number of open streams initiated by the client + curPushedStreams uint32 // number of open streams initiated by server push + maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests + maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes + streams map[uint32]*http2stream + initialStreamSendWindowSize int32 + maxFrameSize int32 + headerTableSize uint32 + peerMaxHeaderListSize uint32 // zero means unknown (default) + canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case + writingFrame bool // started writing a frame (on serve goroutine or separate) + writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh + needsFrameFlush bool // last frame write wasn't a flush + inGoAway bool // we've started to or sent GOAWAY + inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop + needToSendGoAway bool // we need to schedule a GOAWAY frame write + goAwayCode http2ErrCode + shutdownTimerCh <-chan time.Time // nil until used + shutdownTimer *time.Timer // nil until used + idleTimer *time.Timer // nil if unused + idleTimerCh <-chan time.Time // nil if unused // Owned by the writeFrameAsync goroutine: headerWriteBuf bytes.Buffer @@ -3294,10 +3439,9 @@ type http2stream struct { numTrailerValues int64 weight uint8 state http2streamState - resetQueued bool // RST_STREAM queued for write; set by sc.resetStream - gotTrailerHeader bool // HEADER frame for trailers was seen - wroteHeaders bool // whether we wrote headers (not status 100) - reqBuf []byte // if non-nil, body pipe buffer to return later at EOF + resetQueued bool // RST_STREAM queued for write; set by sc.resetStream + gotTrailerHeader bool // HEADER frame for trailers was seen + wroteHeaders bool // whether we wrote headers (not status 100) trailer Header // accumulated trailers reqTrailer Header // handler's Request.Trailer @@ -3518,10 +3662,15 @@ func (sc *http2serverConn) serve() { {http2SettingMaxFrameSize, sc.srv.maxReadFrameSize()}, {http2SettingMaxConcurrentStreams, sc.advMaxStreams}, {http2SettingMaxHeaderListSize, sc.maxHeaderListSize()}, + {http2SettingInitialWindowSize, uint32(sc.srv.initialStreamRecvWindowSize())}, }, }) sc.unackedSettings++ + if diff := sc.srv.initialConnRecvWindowSize() - http2initialWindowSize; diff > 0 { + sc.sendWindowUpdate(nil, int(diff)) + } + if err := sc.readPreface(); err != nil { sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err) return @@ -4132,9 +4281,9 @@ func (sc *http2serverConn) processSetting(s http2Setting) error { func (sc *http2serverConn) processSettingInitialWindowSize(val uint32) error { sc.serveG.check() - old := sc.initialWindowSize - sc.initialWindowSize = int32(val) - growth := sc.initialWindowSize - old + old := sc.initialStreamSendWindowSize + sc.initialStreamSendWindowSize = int32(val) + growth := int32(val) - old for _, st := range sc.streams { if !st.flow.add(growth) { @@ -4395,9 +4544,9 @@ func (sc *http2serverConn) newStream(id, pusherID uint32, state http2streamState } st.cw.Init() st.flow.conn = &sc.flow - st.flow.add(sc.initialWindowSize) + st.flow.add(sc.initialStreamSendWindowSize) st.inflow.conn = &sc.inflow - st.inflow.add(http2initialWindowSize) + st.inflow.add(sc.srv.initialStreamRecvWindowSize()) sc.streams[id] = st sc.writeSched.OpenStream(st.id, http2OpenStreamOptions{PusherID: pusherID}) @@ -4452,16 +4601,14 @@ func (sc *http2serverConn) newWriterAndRequest(st *http2stream, f *http2MetaHead return nil, nil, err } if bodyOpen { - st.reqBuf = http2getRequestBodyBuf() - req.Body.(*http2requestBody).pipe = &http2pipe{ - b: &http2fixedBuffer{buf: st.reqBuf}, - } - if vv, ok := rp.header["Content-Length"]; ok { req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64) } else { req.ContentLength = -1 } + req.Body.(*http2requestBody).pipe = &http2pipe{ + b: &http2dataBuffer{expected: req.ContentLength}, + } } return rw, req, nil } @@ -4556,24 +4703,6 @@ func (sc *http2serverConn) newWriterAndRequestNoBody(st *http2stream, rp http2re return rw, req, nil } -var http2reqBodyCache = make(chan []byte, 8) - -func http2getRequestBodyBuf() []byte { - select { - case b := <-http2reqBodyCache: - return b - default: - return make([]byte, http2initialWindowSize) - } -} - -func http2putRequestBodyBuf(b []byte) { - select { - case http2reqBodyCache <- b: - default: - } -} - // Run on its own goroutine. func (sc *http2serverConn) runHandler(rw *http2responseWriter, req *Request, handler func(ResponseWriter, *Request)) { didPanic := true @@ -4666,12 +4795,6 @@ func (sc *http2serverConn) noteBodyReadFromHandler(st *http2stream, n int, err e case <-sc.doneServing: } } - if err == io.EOF { - if buf := st.reqBuf; buf != nil { - st.reqBuf = nil - http2putRequestBodyBuf(buf) - } - } } func (sc *http2serverConn) noteBodyRead(st *http2stream, n int) { @@ -4765,8 +4888,8 @@ func (b *http2requestBody) Read(p []byte) (n int, err error) { return } -// responseWriter is the http.ResponseWriter implementation. It's -// intentionally small (1 pointer wide) to minimize garbage. The +// responseWriter is the http.ResponseWriter implementation. It's +// intentionally small (1 pointer wide) to minimize garbage. The // responseWriterState pointer inside is zeroed at the end of a // request (in handlerDone) and calls on the responseWriter thereafter // simply crash (caller's mistake), but the much larger responseWriterState @@ -4940,7 +5063,7 @@ const http2TrailerPrefix = "Trailer:" // says you SHOULD (but not must) predeclare any trailers in the // header, the official ResponseWriter rules said trailers in Go must // be predeclared, and then we reuse the same ResponseWriter.Header() -// map to mean both Headers and Trailers. When it's time to write the +// map to mean both Headers and Trailers. When it's time to write the // Trailers, we pick out the fields of Headers that were declared as // trailers. That worked for a while, until we found the first major // user of Trailers in the wild: gRPC (using them only over http2), @@ -5920,7 +6043,7 @@ func (cc *http2ClientConn) canTakeNewRequestLocked() bool { cc.nextStreamID < math.MaxInt32 } -// onIdleTimeout is called from a time.AfterFunc goroutine. It will +// onIdleTimeout is called from a time.AfterFunc goroutine. It will // only be called when we're idle, but because we're coming from a new // goroutine, there could be a new request coming in at the same time, // so this simply calls the synchronized closeIfIdle to shut down this @@ -6003,7 +6126,6 @@ func http2commaSeparatedTrailers(req *Request) (string, error) { } if len(keys) > 0 { sort.Strings(keys) - return strings.Join(keys, ","), nil } return "", nil @@ -6804,8 +6926,7 @@ func (rl *http2clientConnReadLoop) handleResponse(cs *http2clientStream, f *http return res, nil } - buf := new(bytes.Buffer) - cs.bufPipe = http2pipe{b: buf} + cs.bufPipe = http2pipe{b: &http2dataBuffer{expected: res.ContentLength}} cs.bytesRemain = res.ContentLength res.Body = http2transportResponseBody{cs} go cs.awaitRequestCancel(cs.req)