1
0
mirror of https://github.com/golang/go synced 2024-09-24 03:10:16 -06:00

net/http: update bundles http2

Updates http2 to x/net/http2 git rev 906cda9 for:

http2: add configurable knobs for the server's receive window
https://golang.org/cl/37226

http2/hpack: speedup Encoder.searchTable
https://golang.org/cl/37406

http2: Add opt-in option to Framer to allow DataFrame struct reuse
https://golang.org/cl/34812

http2: replace fixedBuffer with dataBuffer
https://golang.org/cl/37400

http2/hpack: remove hpack's constant time string comparison
https://golang.org/cl/37394

Updates golang/go#16512
Updates golang/go#18404

Change-Id: I1ad7c95c404ead4ced7f85af061cf811b299a288
Reviewed-on: https://go-review.googlesource.com/37500
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
This commit is contained in:
Tom Bergan 2017-02-27 11:09:42 -08:00
parent 0df81e8887
commit 5ae7cbfff6

View File

@ -277,7 +277,7 @@ func http2filterOutClientConn(in []*http2ClientConn, exclude *http2ClientConn) [
} }
// noDialClientConnPool is an implementation of http2.ClientConnPool // noDialClientConnPool is an implementation of http2.ClientConnPool
// which never dials. We let the HTTP/1.1 client dial and use its TLS // which never dials. We let the HTTP/1.1 client dial and use its TLS
// connection instead. // connection instead.
type http2noDialClientConnPool struct{ *http2clientConnPool } type http2noDialClientConnPool struct{ *http2clientConnPool }
@ -349,6 +349,139 @@ func (rt http2noDialH2RoundTripper) RoundTrip(req *Request) (*Response, error) {
return res, err return res, err
} }
// Buffer chunks are allocated from a pool to reduce pressure on GC.
// The maximum wasted space per dataBuffer is 2x the largest size class,
// which happens when the dataBuffer has multiple chunks and there is
// one unread byte in both the first and last chunks. We use a few size
// classes to minimize overheads for servers that typically receive very
// small request bodies.
//
// TODO: Benchmark to determine if the pools are necessary. The GC may have
// improved enough that we can instead allocate chunks like this:
// make([]byte, max(16<<10, expectedBytesRemaining))
var (
http2dataChunkSizeClasses = []int{
1 << 10,
2 << 10,
4 << 10,
8 << 10,
16 << 10,
}
http2dataChunkPools = [...]sync.Pool{
{New: func() interface{} { return make([]byte, 1<<10) }},
{New: func() interface{} { return make([]byte, 2<<10) }},
{New: func() interface{} { return make([]byte, 4<<10) }},
{New: func() interface{} { return make([]byte, 8<<10) }},
{New: func() interface{} { return make([]byte, 16<<10) }},
}
)
func http2getDataBufferChunk(size int64) []byte {
i := 0
for ; i < len(http2dataChunkSizeClasses)-1; i++ {
if size <= int64(http2dataChunkSizeClasses[i]) {
break
}
}
return http2dataChunkPools[i].Get().([]byte)
}
func http2putDataBufferChunk(p []byte) {
for i, n := range http2dataChunkSizeClasses {
if len(p) == n {
http2dataChunkPools[i].Put(p)
return
}
}
panic(fmt.Sprintf("unexpected buffer len=%v", len(p)))
}
// dataBuffer is an io.ReadWriter backed by a list of data chunks.
// Each dataBuffer is used to read DATA frames on a single stream.
// The buffer is divided into chunks so the server can limit the
// total memory used by a single connection without limiting the
// request body size on any single stream.
type http2dataBuffer struct {
chunks [][]byte
r int // next byte to read is chunks[0][r]
w int // next byte to write is chunks[len(chunks)-1][w]
size int // total buffered bytes
expected int64 // we expect at least this many bytes in future Write calls (ignored if <= 0)
}
var http2errReadEmpty = errors.New("read from empty dataBuffer")
// Read copies bytes from the buffer into p.
// It is an error to read when no data is available.
func (b *http2dataBuffer) Read(p []byte) (int, error) {
if b.size == 0 {
return 0, http2errReadEmpty
}
var ntotal int
for len(p) > 0 && b.size > 0 {
readFrom := b.bytesFromFirstChunk()
n := copy(p, readFrom)
p = p[n:]
ntotal += n
b.r += n
b.size -= n
if b.r == len(b.chunks[0]) {
http2putDataBufferChunk(b.chunks[0])
end := len(b.chunks) - 1
copy(b.chunks[:end], b.chunks[1:])
b.chunks[end] = nil
b.chunks = b.chunks[:end]
b.r = 0
}
}
return ntotal, nil
}
func (b *http2dataBuffer) bytesFromFirstChunk() []byte {
if len(b.chunks) == 1 {
return b.chunks[0][b.r:b.w]
}
return b.chunks[0][b.r:]
}
// Len returns the number of bytes of the unread portion of the buffer.
func (b *http2dataBuffer) Len() int {
return b.size
}
// Write appends p to the buffer.
func (b *http2dataBuffer) Write(p []byte) (int, error) {
ntotal := len(p)
for len(p) > 0 {
want := int64(len(p))
if b.expected > want {
want = b.expected
}
chunk := b.lastChunkOrAlloc(want)
n := copy(chunk[b.w:], p)
p = p[n:]
b.w += n
b.size += n
b.expected -= int64(n)
}
return ntotal, nil
}
func (b *http2dataBuffer) lastChunkOrAlloc(want int64) []byte {
if len(b.chunks) != 0 {
last := b.chunks[len(b.chunks)-1]
if b.w < len(last) {
return last
}
}
chunk := http2getDataBufferChunk(want)
b.chunks = append(b.chunks, chunk)
b.w = 0
return chunk
}
// An ErrCode is an unsigned 32-bit error code as defined in the HTTP/2 spec. // An ErrCode is an unsigned 32-bit error code as defined in the HTTP/2 spec.
type http2ErrCode uint32 type http2ErrCode uint32
@ -469,56 +602,6 @@ var (
http2errPseudoAfterRegular = errors.New("pseudo header field after regular") http2errPseudoAfterRegular = errors.New("pseudo header field after regular")
) )
// fixedBuffer is an io.ReadWriter backed by a fixed size buffer.
// It never allocates, but moves old data as new data is written.
type http2fixedBuffer struct {
buf []byte
r, w int
}
var (
http2errReadEmpty = errors.New("read from empty fixedBuffer")
http2errWriteFull = errors.New("write on full fixedBuffer")
)
// Read copies bytes from the buffer into p.
// It is an error to read when no data is available.
func (b *http2fixedBuffer) Read(p []byte) (n int, err error) {
if b.r == b.w {
return 0, http2errReadEmpty
}
n = copy(p, b.buf[b.r:b.w])
b.r += n
if b.r == b.w {
b.r = 0
b.w = 0
}
return n, nil
}
// Len returns the number of bytes of the unread portion of the buffer.
func (b *http2fixedBuffer) Len() int {
return b.w - b.r
}
// Write copies bytes from p into the buffer.
// It is an error to write more data than the buffer can hold.
func (b *http2fixedBuffer) Write(p []byte) (n int, err error) {
if b.r > 0 && len(p) > len(b.buf)-b.w {
copy(b.buf, b.buf[b.r:b.w])
b.w -= b.r
b.r = 0
}
n = copy(b.buf[b.w:], p)
b.w += n
if n < len(p) {
err = http2errWriteFull
}
return n, err
}
// flow is the flow control window's size. // flow is the flow control window's size.
type http2flow struct { type http2flow struct {
// n is the number of DATA bytes we're allowed to send. // n is the number of DATA bytes we're allowed to send.
@ -666,7 +749,7 @@ var http2flagName = map[http2FrameType]map[http2Flags]string{
// a frameParser parses a frame given its FrameHeader and payload // a frameParser parses a frame given its FrameHeader and payload
// bytes. The length of payload will always equal fh.Length (which // bytes. The length of payload will always equal fh.Length (which
// might be 0). // might be 0).
type http2frameParser func(fh http2FrameHeader, payload []byte) (http2Frame, error) type http2frameParser func(fc *http2frameCache, fh http2FrameHeader, payload []byte) (http2Frame, error)
var http2frameParsers = map[http2FrameType]http2frameParser{ var http2frameParsers = map[http2FrameType]http2frameParser{
http2FrameData: http2parseDataFrame, http2FrameData: http2parseDataFrame,
@ -861,6 +944,8 @@ type http2Framer struct {
debugFramerBuf *bytes.Buffer debugFramerBuf *bytes.Buffer
debugReadLoggerf func(string, ...interface{}) debugReadLoggerf func(string, ...interface{})
debugWriteLoggerf func(string, ...interface{}) debugWriteLoggerf func(string, ...interface{})
frameCache *http2frameCache // nil if frames aren't reused (default)
} }
func (fr *http2Framer) maxHeaderListSize() uint32 { func (fr *http2Framer) maxHeaderListSize() uint32 {
@ -937,6 +1022,27 @@ const (
http2maxFrameSize = 1<<24 - 1 http2maxFrameSize = 1<<24 - 1
) )
// SetReuseFrames allows the Framer to reuse Frames.
// If called on a Framer, Frames returned by calls to ReadFrame are only
// valid until the next call to ReadFrame.
func (fr *http2Framer) SetReuseFrames() {
if fr.frameCache != nil {
return
}
fr.frameCache = &http2frameCache{}
}
type http2frameCache struct {
dataFrame http2DataFrame
}
func (fc *http2frameCache) getDataFrame() *http2DataFrame {
if fc == nil {
return &http2DataFrame{}
}
return &fc.dataFrame
}
// NewFramer returns a Framer that writes frames to w and reads them from r. // NewFramer returns a Framer that writes frames to w and reads them from r.
func http2NewFramer(w io.Writer, r io.Reader) *http2Framer { func http2NewFramer(w io.Writer, r io.Reader) *http2Framer {
fr := &http2Framer{ fr := &http2Framer{
@ -1016,7 +1122,7 @@ func (fr *http2Framer) ReadFrame() (http2Frame, error) {
if _, err := io.ReadFull(fr.r, payload); err != nil { if _, err := io.ReadFull(fr.r, payload); err != nil {
return nil, err return nil, err
} }
f, err := http2typeFrameParser(fh.Type)(fh, payload) f, err := http2typeFrameParser(fh.Type)(fr.frameCache, fh, payload)
if err != nil { if err != nil {
if ce, ok := err.(http2connError); ok { if ce, ok := err.(http2connError); ok {
return nil, fr.connError(ce.Code, ce.Reason) return nil, fr.connError(ce.Code, ce.Reason)
@ -1104,14 +1210,14 @@ func (f *http2DataFrame) Data() []byte {
return f.data return f.data
} }
func http2parseDataFrame(fh http2FrameHeader, payload []byte) (http2Frame, error) { func http2parseDataFrame(fc *http2frameCache, fh http2FrameHeader, payload []byte) (http2Frame, error) {
if fh.StreamID == 0 { if fh.StreamID == 0 {
return nil, http2connError{http2ErrCodeProtocol, "DATA frame with stream ID 0"} return nil, http2connError{http2ErrCodeProtocol, "DATA frame with stream ID 0"}
} }
f := &http2DataFrame{ f := fc.getDataFrame()
http2FrameHeader: fh, f.http2FrameHeader = fh
}
var padSize byte var padSize byte
if fh.Flags.Has(http2FlagDataPadded) { if fh.Flags.Has(http2FlagDataPadded) {
var err error var err error
@ -1132,6 +1238,7 @@ var (
http2errStreamID = errors.New("invalid stream ID") http2errStreamID = errors.New("invalid stream ID")
http2errDepStreamID = errors.New("invalid dependent stream ID") http2errDepStreamID = errors.New("invalid dependent stream ID")
http2errPadLength = errors.New("pad length too large") http2errPadLength = errors.New("pad length too large")
http2errPadBytes = errors.New("padding bytes must all be zeros unless AllowIllegalWrites is enabled")
) )
func http2validStreamIDOrZero(streamID uint32) bool { func http2validStreamIDOrZero(streamID uint32) bool {
@ -1155,6 +1262,7 @@ func (f *http2Framer) WriteData(streamID uint32, endStream bool, data []byte) er
// //
// If pad is nil, the padding bit is not sent. // If pad is nil, the padding bit is not sent.
// The length of pad must not exceed 255 bytes. // The length of pad must not exceed 255 bytes.
// The bytes of pad must all be zero, unless f.AllowIllegalWrites is set.
// //
// It will perform exactly one Write to the underlying Writer. // It will perform exactly one Write to the underlying Writer.
// It is the caller's responsibility not to violate the maximum frame size // It is the caller's responsibility not to violate the maximum frame size
@ -1163,8 +1271,18 @@ func (f *http2Framer) WriteDataPadded(streamID uint32, endStream bool, data, pad
if !http2validStreamID(streamID) && !f.AllowIllegalWrites { if !http2validStreamID(streamID) && !f.AllowIllegalWrites {
return http2errStreamID return http2errStreamID
} }
if len(pad) > 255 { if len(pad) > 0 {
return http2errPadLength if len(pad) > 255 {
return http2errPadLength
}
if !f.AllowIllegalWrites {
for _, b := range pad {
if b != 0 {
return http2errPadBytes
}
}
}
} }
var flags http2Flags var flags http2Flags
if endStream { if endStream {
@ -1192,7 +1310,7 @@ type http2SettingsFrame struct {
p []byte p []byte
} }
func http2parseSettingsFrame(fh http2FrameHeader, p []byte) (http2Frame, error) { func http2parseSettingsFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (http2Frame, error) {
if fh.Flags.Has(http2FlagSettingsAck) && fh.Length > 0 { if fh.Flags.Has(http2FlagSettingsAck) && fh.Length > 0 {
return nil, http2ConnectionError(http2ErrCodeFrameSize) return nil, http2ConnectionError(http2ErrCodeFrameSize)
@ -1281,7 +1399,7 @@ type http2PingFrame struct {
func (f *http2PingFrame) IsAck() bool { return f.Flags.Has(http2FlagPingAck) } func (f *http2PingFrame) IsAck() bool { return f.Flags.Has(http2FlagPingAck) }
func http2parsePingFrame(fh http2FrameHeader, payload []byte) (http2Frame, error) { func http2parsePingFrame(_ *http2frameCache, fh http2FrameHeader, payload []byte) (http2Frame, error) {
if len(payload) != 8 { if len(payload) != 8 {
return nil, http2ConnectionError(http2ErrCodeFrameSize) return nil, http2ConnectionError(http2ErrCodeFrameSize)
} }
@ -1321,7 +1439,7 @@ func (f *http2GoAwayFrame) DebugData() []byte {
return f.debugData return f.debugData
} }
func http2parseGoAwayFrame(fh http2FrameHeader, p []byte) (http2Frame, error) { func http2parseGoAwayFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (http2Frame, error) {
if fh.StreamID != 0 { if fh.StreamID != 0 {
return nil, http2ConnectionError(http2ErrCodeProtocol) return nil, http2ConnectionError(http2ErrCodeProtocol)
} }
@ -1361,7 +1479,7 @@ func (f *http2UnknownFrame) Payload() []byte {
return f.p return f.p
} }
func http2parseUnknownFrame(fh http2FrameHeader, p []byte) (http2Frame, error) { func http2parseUnknownFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (http2Frame, error) {
return &http2UnknownFrame{fh, p}, nil return &http2UnknownFrame{fh, p}, nil
} }
@ -1372,7 +1490,7 @@ type http2WindowUpdateFrame struct {
Increment uint32 // never read with high bit set Increment uint32 // never read with high bit set
} }
func http2parseWindowUpdateFrame(fh http2FrameHeader, p []byte) (http2Frame, error) { func http2parseWindowUpdateFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (http2Frame, error) {
if len(p) != 4 { if len(p) != 4 {
return nil, http2ConnectionError(http2ErrCodeFrameSize) return nil, http2ConnectionError(http2ErrCodeFrameSize)
} }
@ -1432,7 +1550,7 @@ func (f *http2HeadersFrame) HasPriority() bool {
return f.http2FrameHeader.Flags.Has(http2FlagHeadersPriority) return f.http2FrameHeader.Flags.Has(http2FlagHeadersPriority)
} }
func http2parseHeadersFrame(fh http2FrameHeader, p []byte) (_ http2Frame, err error) { func http2parseHeadersFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (_ http2Frame, err error) {
hf := &http2HeadersFrame{ hf := &http2HeadersFrame{
http2FrameHeader: fh, http2FrameHeader: fh,
} }
@ -1556,7 +1674,7 @@ type http2PriorityParam struct {
Exclusive bool Exclusive bool
// Weight is the stream's zero-indexed weight. It should be // Weight is the stream's zero-indexed weight. It should be
// set together with StreamDep, or neither should be set. Per // set together with StreamDep, or neither should be set. Per
// the spec, "Add one to the value to obtain a weight between // the spec, "Add one to the value to obtain a weight between
// 1 and 256." // 1 and 256."
Weight uint8 Weight uint8
@ -1566,7 +1684,7 @@ func (p http2PriorityParam) IsZero() bool {
return p == http2PriorityParam{} return p == http2PriorityParam{}
} }
func http2parsePriorityFrame(fh http2FrameHeader, payload []byte) (http2Frame, error) { func http2parsePriorityFrame(_ *http2frameCache, fh http2FrameHeader, payload []byte) (http2Frame, error) {
if fh.StreamID == 0 { if fh.StreamID == 0 {
return nil, http2connError{http2ErrCodeProtocol, "PRIORITY frame with stream ID 0"} return nil, http2connError{http2ErrCodeProtocol, "PRIORITY frame with stream ID 0"}
} }
@ -1613,7 +1731,7 @@ type http2RSTStreamFrame struct {
ErrCode http2ErrCode ErrCode http2ErrCode
} }
func http2parseRSTStreamFrame(fh http2FrameHeader, p []byte) (http2Frame, error) { func http2parseRSTStreamFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (http2Frame, error) {
if len(p) != 4 { if len(p) != 4 {
return nil, http2ConnectionError(http2ErrCodeFrameSize) return nil, http2ConnectionError(http2ErrCodeFrameSize)
} }
@ -1643,7 +1761,7 @@ type http2ContinuationFrame struct {
headerFragBuf []byte headerFragBuf []byte
} }
func http2parseContinuationFrame(fh http2FrameHeader, p []byte) (http2Frame, error) { func http2parseContinuationFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (http2Frame, error) {
if fh.StreamID == 0 { if fh.StreamID == 0 {
return nil, http2connError{http2ErrCodeProtocol, "CONTINUATION frame with stream ID 0"} return nil, http2connError{http2ErrCodeProtocol, "CONTINUATION frame with stream ID 0"}
} }
@ -1693,7 +1811,7 @@ func (f *http2PushPromiseFrame) HeadersEnded() bool {
return f.http2FrameHeader.Flags.Has(http2FlagPushPromiseEndHeaders) return f.http2FrameHeader.Flags.Has(http2FlagPushPromiseEndHeaders)
} }
func http2parsePushPromise(fh http2FrameHeader, p []byte) (_ http2Frame, err error) { func http2parsePushPromise(_ *http2frameCache, fh http2FrameHeader, p []byte) (_ http2Frame, err error) {
pp := &http2PushPromiseFrame{ pp := &http2PushPromiseFrame{
http2FrameHeader: fh, http2FrameHeader: fh,
} }
@ -2778,7 +2896,7 @@ func http2validPseudoPath(v string) bool {
return (len(v) > 0 && v[0] == '/' && (len(v) == 1 || v[1] != '/')) || v == "*" return (len(v) > 0 && v[0] == '/' && (len(v) == 1 || v[1] != '/')) || v == "*"
} }
// pipe is a goroutine-safe io.Reader/io.Writer pair. It's like // pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
// io.Pipe except there are no PipeReader/PipeWriter halves, and the // io.Pipe except there are no PipeReader/PipeWriter halves, and the
// underlying buffer is an interface. (io.Pipe is always unbuffered) // underlying buffer is an interface. (io.Pipe is always unbuffered)
type http2pipe struct { type http2pipe struct {
@ -2980,11 +3098,38 @@ type http2Server struct {
// activity for the purposes of IdleTimeout. // activity for the purposes of IdleTimeout.
IdleTimeout time.Duration IdleTimeout time.Duration
// MaxUploadBufferPerConnection is the size of the initial flow
// control window for each connections. The HTTP/2 spec does not
// allow this to be smaller than 65535 or larger than 2^32-1.
// If the value is outside this range, a default value will be
// used instead.
MaxUploadBufferPerConnection int32
// MaxUploadBufferPerStream is the size of the initial flow control
// window for each stream. The HTTP/2 spec does not allow this to
// be larger than 2^32-1. If the value is zero or larger than the
// maximum, a default value will be used instead.
MaxUploadBufferPerStream int32
// NewWriteScheduler constructs a write scheduler for a connection. // NewWriteScheduler constructs a write scheduler for a connection.
// If nil, a default scheduler is chosen. // If nil, a default scheduler is chosen.
NewWriteScheduler func() http2WriteScheduler NewWriteScheduler func() http2WriteScheduler
} }
func (s *http2Server) initialConnRecvWindowSize() int32 {
if s.MaxUploadBufferPerConnection > http2initialWindowSize {
return s.MaxUploadBufferPerConnection
}
return 1 << 20
}
func (s *http2Server) initialStreamRecvWindowSize() int32 {
if s.MaxUploadBufferPerStream > 0 {
return s.MaxUploadBufferPerStream
}
return 1 << 20
}
func (s *http2Server) maxReadFrameSize() uint32 { func (s *http2Server) maxReadFrameSize() uint32 {
if v := s.MaxReadFrameSize; v >= http2minMaxFrameSize && v <= http2maxFrameSize { if v := s.MaxReadFrameSize; v >= http2minMaxFrameSize && v <= http2maxFrameSize {
return v return v
@ -3118,27 +3263,27 @@ func (s *http2Server) ServeConn(c net.Conn, opts *http2ServeConnOpts) {
defer cancel() defer cancel()
sc := &http2serverConn{ sc := &http2serverConn{
srv: s, srv: s,
hs: opts.baseConfig(), hs: opts.baseConfig(),
conn: c, conn: c,
baseCtx: baseCtx, baseCtx: baseCtx,
remoteAddrStr: c.RemoteAddr().String(), remoteAddrStr: c.RemoteAddr().String(),
bw: http2newBufferedWriter(c), bw: http2newBufferedWriter(c),
handler: opts.handler(), handler: opts.handler(),
streams: make(map[uint32]*http2stream), streams: make(map[uint32]*http2stream),
readFrameCh: make(chan http2readFrameResult), readFrameCh: make(chan http2readFrameResult),
wantWriteFrameCh: make(chan http2FrameWriteRequest, 8), wantWriteFrameCh: make(chan http2FrameWriteRequest, 8),
wantStartPushCh: make(chan http2startPushRequest, 8), wantStartPushCh: make(chan http2startPushRequest, 8),
wroteFrameCh: make(chan http2frameWriteResult, 1), wroteFrameCh: make(chan http2frameWriteResult, 1),
bodyReadCh: make(chan http2bodyReadMsg), bodyReadCh: make(chan http2bodyReadMsg),
doneServing: make(chan struct{}), doneServing: make(chan struct{}),
clientMaxStreams: math.MaxUint32, clientMaxStreams: math.MaxUint32,
advMaxStreams: s.maxConcurrentStreams(), advMaxStreams: s.maxConcurrentStreams(),
initialWindowSize: http2initialWindowSize, initialStreamSendWindowSize: http2initialWindowSize,
maxFrameSize: http2initialMaxFrameSize, maxFrameSize: http2initialMaxFrameSize,
headerTableSize: http2initialHeaderTableSize, headerTableSize: http2initialHeaderTableSize,
serveG: http2newGoroutineLock(), serveG: http2newGoroutineLock(),
pushEnabled: true, pushEnabled: true,
} }
if sc.hs.WriteTimeout != 0 { if sc.hs.WriteTimeout != 0 {
@ -3218,34 +3363,34 @@ type http2serverConn struct {
writeSched http2WriteScheduler writeSched http2WriteScheduler
// Everything following is owned by the serve loop; use serveG.check(): // Everything following is owned by the serve loop; use serveG.check():
serveG http2goroutineLock // used to verify funcs are on serve() serveG http2goroutineLock // used to verify funcs are on serve()
pushEnabled bool pushEnabled bool
sawFirstSettings bool // got the initial SETTINGS frame after the preface sawFirstSettings bool // got the initial SETTINGS frame after the preface
needToSendSettingsAck bool needToSendSettingsAck bool
unackedSettings int // how many SETTINGS have we sent without ACKs? unackedSettings int // how many SETTINGS have we sent without ACKs?
clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit) clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit)
advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
curClientStreams uint32 // number of open streams initiated by the client curClientStreams uint32 // number of open streams initiated by the client
curPushedStreams uint32 // number of open streams initiated by server push curPushedStreams uint32 // number of open streams initiated by server push
maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
streams map[uint32]*http2stream streams map[uint32]*http2stream
initialWindowSize int32 initialStreamSendWindowSize int32
maxFrameSize int32 maxFrameSize int32
headerTableSize uint32 headerTableSize uint32
peerMaxHeaderListSize uint32 // zero means unknown (default) peerMaxHeaderListSize uint32 // zero means unknown (default)
canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
writingFrame bool // started writing a frame (on serve goroutine or separate) writingFrame bool // started writing a frame (on serve goroutine or separate)
writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh
needsFrameFlush bool // last frame write wasn't a flush needsFrameFlush bool // last frame write wasn't a flush
inGoAway bool // we've started to or sent GOAWAY inGoAway bool // we've started to or sent GOAWAY
inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
needToSendGoAway bool // we need to schedule a GOAWAY frame write needToSendGoAway bool // we need to schedule a GOAWAY frame write
goAwayCode http2ErrCode goAwayCode http2ErrCode
shutdownTimerCh <-chan time.Time // nil until used shutdownTimerCh <-chan time.Time // nil until used
shutdownTimer *time.Timer // nil until used shutdownTimer *time.Timer // nil until used
idleTimer *time.Timer // nil if unused idleTimer *time.Timer // nil if unused
idleTimerCh <-chan time.Time // nil if unused idleTimerCh <-chan time.Time // nil if unused
// Owned by the writeFrameAsync goroutine: // Owned by the writeFrameAsync goroutine:
headerWriteBuf bytes.Buffer headerWriteBuf bytes.Buffer
@ -3294,10 +3439,9 @@ type http2stream struct {
numTrailerValues int64 numTrailerValues int64
weight uint8 weight uint8
state http2streamState state http2streamState
resetQueued bool // RST_STREAM queued for write; set by sc.resetStream resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
gotTrailerHeader bool // HEADER frame for trailers was seen gotTrailerHeader bool // HEADER frame for trailers was seen
wroteHeaders bool // whether we wrote headers (not status 100) wroteHeaders bool // whether we wrote headers (not status 100)
reqBuf []byte // if non-nil, body pipe buffer to return later at EOF
trailer Header // accumulated trailers trailer Header // accumulated trailers
reqTrailer Header // handler's Request.Trailer reqTrailer Header // handler's Request.Trailer
@ -3518,10 +3662,15 @@ func (sc *http2serverConn) serve() {
{http2SettingMaxFrameSize, sc.srv.maxReadFrameSize()}, {http2SettingMaxFrameSize, sc.srv.maxReadFrameSize()},
{http2SettingMaxConcurrentStreams, sc.advMaxStreams}, {http2SettingMaxConcurrentStreams, sc.advMaxStreams},
{http2SettingMaxHeaderListSize, sc.maxHeaderListSize()}, {http2SettingMaxHeaderListSize, sc.maxHeaderListSize()},
{http2SettingInitialWindowSize, uint32(sc.srv.initialStreamRecvWindowSize())},
}, },
}) })
sc.unackedSettings++ sc.unackedSettings++
if diff := sc.srv.initialConnRecvWindowSize() - http2initialWindowSize; diff > 0 {
sc.sendWindowUpdate(nil, int(diff))
}
if err := sc.readPreface(); err != nil { if err := sc.readPreface(); err != nil {
sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err) sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
return return
@ -4132,9 +4281,9 @@ func (sc *http2serverConn) processSetting(s http2Setting) error {
func (sc *http2serverConn) processSettingInitialWindowSize(val uint32) error { func (sc *http2serverConn) processSettingInitialWindowSize(val uint32) error {
sc.serveG.check() sc.serveG.check()
old := sc.initialWindowSize old := sc.initialStreamSendWindowSize
sc.initialWindowSize = int32(val) sc.initialStreamSendWindowSize = int32(val)
growth := sc.initialWindowSize - old growth := int32(val) - old
for _, st := range sc.streams { for _, st := range sc.streams {
if !st.flow.add(growth) { if !st.flow.add(growth) {
@ -4395,9 +4544,9 @@ func (sc *http2serverConn) newStream(id, pusherID uint32, state http2streamState
} }
st.cw.Init() st.cw.Init()
st.flow.conn = &sc.flow st.flow.conn = &sc.flow
st.flow.add(sc.initialWindowSize) st.flow.add(sc.initialStreamSendWindowSize)
st.inflow.conn = &sc.inflow st.inflow.conn = &sc.inflow
st.inflow.add(http2initialWindowSize) st.inflow.add(sc.srv.initialStreamRecvWindowSize())
sc.streams[id] = st sc.streams[id] = st
sc.writeSched.OpenStream(st.id, http2OpenStreamOptions{PusherID: pusherID}) sc.writeSched.OpenStream(st.id, http2OpenStreamOptions{PusherID: pusherID})
@ -4452,16 +4601,14 @@ func (sc *http2serverConn) newWriterAndRequest(st *http2stream, f *http2MetaHead
return nil, nil, err return nil, nil, err
} }
if bodyOpen { if bodyOpen {
st.reqBuf = http2getRequestBodyBuf()
req.Body.(*http2requestBody).pipe = &http2pipe{
b: &http2fixedBuffer{buf: st.reqBuf},
}
if vv, ok := rp.header["Content-Length"]; ok { if vv, ok := rp.header["Content-Length"]; ok {
req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64) req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64)
} else { } else {
req.ContentLength = -1 req.ContentLength = -1
} }
req.Body.(*http2requestBody).pipe = &http2pipe{
b: &http2dataBuffer{expected: req.ContentLength},
}
} }
return rw, req, nil return rw, req, nil
} }
@ -4556,24 +4703,6 @@ func (sc *http2serverConn) newWriterAndRequestNoBody(st *http2stream, rp http2re
return rw, req, nil return rw, req, nil
} }
var http2reqBodyCache = make(chan []byte, 8)
func http2getRequestBodyBuf() []byte {
select {
case b := <-http2reqBodyCache:
return b
default:
return make([]byte, http2initialWindowSize)
}
}
func http2putRequestBodyBuf(b []byte) {
select {
case http2reqBodyCache <- b:
default:
}
}
// Run on its own goroutine. // Run on its own goroutine.
func (sc *http2serverConn) runHandler(rw *http2responseWriter, req *Request, handler func(ResponseWriter, *Request)) { func (sc *http2serverConn) runHandler(rw *http2responseWriter, req *Request, handler func(ResponseWriter, *Request)) {
didPanic := true didPanic := true
@ -4666,12 +4795,6 @@ func (sc *http2serverConn) noteBodyReadFromHandler(st *http2stream, n int, err e
case <-sc.doneServing: case <-sc.doneServing:
} }
} }
if err == io.EOF {
if buf := st.reqBuf; buf != nil {
st.reqBuf = nil
http2putRequestBodyBuf(buf)
}
}
} }
func (sc *http2serverConn) noteBodyRead(st *http2stream, n int) { func (sc *http2serverConn) noteBodyRead(st *http2stream, n int) {
@ -4765,8 +4888,8 @@ func (b *http2requestBody) Read(p []byte) (n int, err error) {
return return
} }
// responseWriter is the http.ResponseWriter implementation. It's // responseWriter is the http.ResponseWriter implementation. It's
// intentionally small (1 pointer wide) to minimize garbage. The // intentionally small (1 pointer wide) to minimize garbage. The
// responseWriterState pointer inside is zeroed at the end of a // responseWriterState pointer inside is zeroed at the end of a
// request (in handlerDone) and calls on the responseWriter thereafter // request (in handlerDone) and calls on the responseWriter thereafter
// simply crash (caller's mistake), but the much larger responseWriterState // simply crash (caller's mistake), but the much larger responseWriterState
@ -4940,7 +5063,7 @@ const http2TrailerPrefix = "Trailer:"
// says you SHOULD (but not must) predeclare any trailers in the // says you SHOULD (but not must) predeclare any trailers in the
// header, the official ResponseWriter rules said trailers in Go must // header, the official ResponseWriter rules said trailers in Go must
// be predeclared, and then we reuse the same ResponseWriter.Header() // be predeclared, and then we reuse the same ResponseWriter.Header()
// map to mean both Headers and Trailers. When it's time to write the // map to mean both Headers and Trailers. When it's time to write the
// Trailers, we pick out the fields of Headers that were declared as // Trailers, we pick out the fields of Headers that were declared as
// trailers. That worked for a while, until we found the first major // trailers. That worked for a while, until we found the first major
// user of Trailers in the wild: gRPC (using them only over http2), // user of Trailers in the wild: gRPC (using them only over http2),
@ -5920,7 +6043,7 @@ func (cc *http2ClientConn) canTakeNewRequestLocked() bool {
cc.nextStreamID < math.MaxInt32 cc.nextStreamID < math.MaxInt32
} }
// onIdleTimeout is called from a time.AfterFunc goroutine. It will // onIdleTimeout is called from a time.AfterFunc goroutine. It will
// only be called when we're idle, but because we're coming from a new // only be called when we're idle, but because we're coming from a new
// goroutine, there could be a new request coming in at the same time, // goroutine, there could be a new request coming in at the same time,
// so this simply calls the synchronized closeIfIdle to shut down this // so this simply calls the synchronized closeIfIdle to shut down this
@ -6003,7 +6126,6 @@ func http2commaSeparatedTrailers(req *Request) (string, error) {
} }
if len(keys) > 0 { if len(keys) > 0 {
sort.Strings(keys) sort.Strings(keys)
return strings.Join(keys, ","), nil return strings.Join(keys, ","), nil
} }
return "", nil return "", nil
@ -6804,8 +6926,7 @@ func (rl *http2clientConnReadLoop) handleResponse(cs *http2clientStream, f *http
return res, nil return res, nil
} }
buf := new(bytes.Buffer) cs.bufPipe = http2pipe{b: &http2dataBuffer{expected: res.ContentLength}}
cs.bufPipe = http2pipe{b: buf}
cs.bytesRemain = res.ContentLength cs.bytesRemain = res.ContentLength
res.Body = http2transportResponseBody{cs} res.Body = http2transportResponseBody{cs}
go cs.awaitRequestCancel(cs.req) go cs.awaitRequestCancel(cs.req)