1
0
mirror of https://github.com/golang/go synced 2024-11-17 21:04:43 -07:00

net/http: update bundled http2

Updates http2 to x/net/http2 git rev 1c05540f687 for:

  http2: fix format argument warnings in tests
  https://golang.org/cl/48090

  http2: retry requests after receiving REFUSED STREAM
  https://golang.org/cl/50471

  http2: block RoundTrip when the Transport hits MaxConcurrentStreams
  https://golang.org/cl/53250

Fixes #13774
Fixes #20985
Fixes #21229

Change-Id: Ie19b4a7cc395a0b7a25fac55f5051faaf94920bb
Reviewed-on: https://go-review.googlesource.com/54052
Run-TryBot: Tom Bergan <tombergan@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Ian Lance Taylor <iant@golang.org>
This commit is contained in:
Tom Bergan 2017-08-08 18:01:08 -07:00
parent 8fb9cee3f1
commit 6b6b9f69fd

View File

@ -30,6 +30,7 @@ import (
"io/ioutil"
"log"
"math"
mathrand "math/rand"
"net"
"net/http/httptrace"
"net/textproto"
@ -6683,6 +6684,7 @@ type http2ClientConn struct {
goAwayDebug string // goAway frame's debug data, retained as a string
streams map[uint32]*http2clientStream // client-initiated
nextStreamID uint32
pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
pings map[[8]byte]chan struct{} // in flight ping data to notification channel
bw *bufio.Writer
br *bufio.Reader
@ -6735,35 +6737,45 @@ type http2clientStream struct {
resTrailer *Header // client's Response.Trailer
}
// awaitRequestCancel runs in its own goroutine and waits for the user
// to cancel a RoundTrip request, its context to expire, or for the
// request to be done (any way it might be removed from the cc.streams
// map: peer reset, successful completion, TCP connection breakage,
// etc)
func (cs *http2clientStream) awaitRequestCancel(req *Request) {
// awaitRequestCancel waits for the user to cancel a request or for the done
// channel to be signaled. A non-nil error is returned only if the request was
// canceled.
func http2awaitRequestCancel(req *Request, done <-chan struct{}) error {
ctx := http2reqContext(req)
if req.Cancel == nil && ctx.Done() == nil {
return
return nil
}
select {
case <-req.Cancel:
cs.cancelStream()
cs.bufPipe.CloseWithError(http2errRequestCanceled)
return http2errRequestCanceled
case <-ctx.Done():
return ctx.Err()
case <-done:
return nil
}
}
// awaitRequestCancel waits for the user to cancel a request, its context to
// expire, or for the request to be done (any way it might be removed from the
// cc.streams map: peer reset, successful completion, TCP connection breakage,
// etc). If the request is canceled, then cs will be canceled and closed.
func (cs *http2clientStream) awaitRequestCancel(req *Request) {
if err := http2awaitRequestCancel(req, cs.done); err != nil {
cs.cancelStream()
cs.bufPipe.CloseWithError(ctx.Err())
case <-cs.done:
cs.bufPipe.CloseWithError(err)
}
}
func (cs *http2clientStream) cancelStream() {
cs.cc.mu.Lock()
cc := cs.cc
cc.mu.Lock()
didReset := cs.didReset
cs.didReset = true
cs.cc.mu.Unlock()
cc.mu.Unlock()
if !didReset {
cs.cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
cc.forgetStreamID(cs.ID)
}
}
@ -6848,7 +6860,7 @@ func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Res
}
addr := http2authorityAddr(req.URL.Scheme, req.URL.Host)
for {
for retry := 0; ; retry++ {
cc, err := t.connPool().GetClientConn(req, addr)
if err != nil {
t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
@ -6856,9 +6868,25 @@ func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Res
}
http2traceGotConn(req, cc)
res, err := cc.RoundTrip(req)
if err != nil {
if req, err = http2shouldRetryRequest(req, err); err == nil {
continue
if err != nil && retry <= 6 {
afterBodyWrite := false
if e, ok := err.(http2afterReqBodyWriteError); ok {
err = e
afterBodyWrite = true
}
if req, err = http2shouldRetryRequest(req, err, afterBodyWrite); err == nil {
// After the first retry, do exponential backoff with 10% jitter.
if retry == 0 {
continue
}
backoff := float64(uint(1) << (uint(retry) - 1))
backoff += backoff * (0.1 * mathrand.Float64())
select {
case <-time.After(time.Second * time.Duration(backoff)):
continue
case <-http2reqContext(req).Done():
return nil, http2reqContext(req).Err()
}
}
}
if err != nil {
@ -6879,43 +6907,60 @@ func (t *http2Transport) CloseIdleConnections() {
}
var (
http2errClientConnClosed = errors.New("http2: client conn is closed")
http2errClientConnUnusable = errors.New("http2: client conn not usable")
http2errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
http2errClientConnGotGoAwayAfterSomeReqBody = errors.New("http2: Transport received Server's graceful shutdown GOAWAY; some request body already written")
http2errClientConnClosed = errors.New("http2: client conn is closed")
http2errClientConnUnusable = errors.New("http2: client conn not usable")
http2errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
)
// afterReqBodyWriteError is a wrapper around errors returned by ClientConn.RoundTrip.
// It is used to signal that err happened after part of Request.Body was sent to the server.
type http2afterReqBodyWriteError struct {
err error
}
func (e http2afterReqBodyWriteError) Error() string {
return e.err.Error() + "; some request body already written"
}
// shouldRetryRequest is called by RoundTrip when a request fails to get
// response headers. It is always called with a non-nil error.
// It returns either a request to retry (either the same request, or a
// modified clone), or an error if the request can't be replayed.
func http2shouldRetryRequest(req *Request, err error) (*Request, error) {
switch err {
default:
func http2shouldRetryRequest(req *Request, err error, afterBodyWrite bool) (*Request, error) {
if !http2canRetryError(err) {
return nil, err
case http2errClientConnUnusable, http2errClientConnGotGoAway:
return req, nil
case http2errClientConnGotGoAwayAfterSomeReqBody:
// If the Body is nil (or http.NoBody), it's safe to reuse
// this request and its Body.
if req.Body == nil || http2reqBodyIsNoBody(req.Body) {
return req, nil
}
// Otherwise we depend on the Request having its GetBody
// func defined.
getBody := http2reqGetBody(req) // Go 1.8: getBody = req.GetBody
if getBody == nil {
return nil, errors.New("http2: Transport: peer server initiated graceful shutdown after some of Request.Body was written; define Request.GetBody to avoid this error")
}
body, err := getBody()
if err != nil {
return nil, err
}
newReq := *req
newReq.Body = body
return &newReq, nil
}
if !afterBodyWrite {
return req, nil
}
// If the Body is nil (or http.NoBody), it's safe to reuse
// this request and its Body.
if req.Body == nil || http2reqBodyIsNoBody(req.Body) {
return req, nil
}
// Otherwise we depend on the Request having its GetBody
// func defined.
getBody := http2reqGetBody(req) // Go 1.8: getBody = req.GetBody
if getBody == nil {
return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err)
}
body, err := getBody()
if err != nil {
return nil, err
}
newReq := *req
newReq.Body = body
return &newReq, nil
}
func http2canRetryError(err error) bool {
if err == http2errClientConnUnusable || err == http2errClientConnGotGoAway {
return true
}
if se, ok := err.(http2StreamError); ok {
return se.Code == http2ErrCodeRefusedStream
}
return false
}
func (t *http2Transport) dialClientConn(addr string, singleUse bool) (*http2ClientConn, error) {
@ -7079,6 +7124,8 @@ func (cc *http2ClientConn) setGoAway(f *http2GoAwayFrame) {
}
}
// CanTakeNewRequest reports whether the connection can take a new request,
// meaning it has not been closed or received or sent a GOAWAY.
func (cc *http2ClientConn) CanTakeNewRequest() bool {
cc.mu.Lock()
defer cc.mu.Unlock()
@ -7090,8 +7137,7 @@ func (cc *http2ClientConn) canTakeNewRequestLocked() bool {
return false
}
return cc.goAway == nil && !cc.closed &&
int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) &&
cc.nextStreamID < math.MaxInt32
int64(cc.nextStreamID)+int64(cc.pendingRequests) < math.MaxInt32
}
// onIdleTimeout is called from a time.AfterFunc goroutine. It will
@ -7237,10 +7283,9 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
hasTrailers := trailers != ""
cc.mu.Lock()
cc.lastActive = time.Now()
if cc.closed || !cc.canTakeNewRequestLocked() {
if err := cc.awaitOpenSlotForRequest(req); err != nil {
cc.mu.Unlock()
return nil, http2errClientConnUnusable
return nil, err
}
body := req.Body
@ -7335,14 +7380,13 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
cs.abortRequestBodyWrite(http2errStopReqBodyWrite)
}
if re.err != nil {
if re.err == http2errClientConnGotGoAway {
cc.mu.Lock()
if cs.startedWrite {
re.err = http2errClientConnGotGoAwayAfterSomeReqBody
}
cc.mu.Unlock()
}
cc.mu.Lock()
afterBodyWrite := cs.startedWrite
cc.mu.Unlock()
cc.forgetStreamID(cs.ID)
if afterBodyWrite {
return nil, http2afterReqBodyWriteError{re.err}
}
return nil, re.err
}
res.Request = req
@ -7355,31 +7399,31 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
case re := <-readLoopResCh:
return handleReadLoopResponse(re)
case <-respHeaderTimer:
cc.forgetStreamID(cs.ID)
if !hasBody || bodyWritten {
cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
} else {
bodyWriter.cancel()
cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel)
}
cc.forgetStreamID(cs.ID)
return nil, http2errTimeout
case <-ctx.Done():
cc.forgetStreamID(cs.ID)
if !hasBody || bodyWritten {
cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
} else {
bodyWriter.cancel()
cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel)
}
cc.forgetStreamID(cs.ID)
return nil, ctx.Err()
case <-req.Cancel:
cc.forgetStreamID(cs.ID)
if !hasBody || bodyWritten {
cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
} else {
bodyWriter.cancel()
cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel)
}
cc.forgetStreamID(cs.ID)
return nil, http2errRequestCanceled
case <-cs.peerReset:
// processResetStream already removed the
@ -7406,6 +7450,45 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
}
}
// awaitOpenSlotForRequest waits until len(streams) < maxConcurrentStreams.
// Must hold cc.mu.
func (cc *http2ClientConn) awaitOpenSlotForRequest(req *Request) error {
var waitingForConn chan struct{}
var waitingForConnErr error // guarded by cc.mu
for {
cc.lastActive = time.Now()
if cc.closed || !cc.canTakeNewRequestLocked() {
return http2errClientConnUnusable
}
if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) {
if waitingForConn != nil {
close(waitingForConn)
}
return nil
}
// Unfortunately, we cannot wait on a condition variable and channel at
// the same time, so instead, we spin up a goroutine to check if the
// request is canceled while we wait for a slot to open in the connection.
if waitingForConn == nil {
waitingForConn = make(chan struct{})
go func() {
if err := http2awaitRequestCancel(req, waitingForConn); err != nil {
cc.mu.Lock()
waitingForConnErr = err
cc.cond.Broadcast()
cc.mu.Unlock()
}
}()
}
cc.pendingRequests++
cc.cond.Wait()
cc.pendingRequests--
if waitingForConnErr != nil {
return waitingForConnErr
}
}
}
// requires cc.wmu be held
func (cc *http2ClientConn) writeHeaders(streamID uint32, endStream bool, hdrs []byte) error {
first := true // first frame written (HEADERS is first, then CONTINUATION)
@ -7765,7 +7848,9 @@ func (cc *http2ClientConn) streamByID(id uint32, andRemove bool) *http2clientStr
cc.idleTimer.Reset(cc.idleTimeout)
}
close(cs.done)
cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl
// Wake up checkResetOrDone via clientStream.awaitFlowControl and
// wake up RoundTrip if there is a pending request.
cc.cond.Broadcast()
}
return cs
}
@ -7864,8 +7949,9 @@ func (rl *http2clientConnReadLoop) run() error {
cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
}
if se, ok := err.(http2StreamError); ok {
if cs := cc.streamByID(se.StreamID, true /*ended; remove it*/); cs != nil {
if cs := cc.streamByID(se.StreamID, false); cs != nil {
cs.cc.writeStreamReset(cs.ID, se.Code, err)
cs.cc.forgetStreamID(cs.ID)
if se.Cause == nil {
se.Cause = cc.fr.errDetail
}
@ -8187,6 +8273,7 @@ func (b http2transportResponseBody) Close() error {
}
cs.bufPipe.BreakWithError(http2errClosedResponseBody)
cc.forgetStreamID(cs.ID)
return nil
}