diff --git a/src/pkg/http/persist.go b/src/pkg/http/persist.go index 8bfc097558..000a4200e5 100644 --- a/src/pkg/http/persist.go +++ b/src/pkg/http/persist.go @@ -6,14 +6,17 @@ package http import ( "bufio" - "container/list" "io" "net" + "net/textproto" "os" "sync" ) -var ErrPersistEOF = &ProtocolError{"persistent connection closed"} +var ( + ErrPersistEOF = &ProtocolError{"persistent connection closed"} + ErrPipeline = &ProtocolError{"pipeline error"} +) // A ServerConn reads requests and sends responses over an underlying // connection, until the HTTP keepalive logic commands an end. ServerConn @@ -26,8 +29,10 @@ type ServerConn struct { r *bufio.Reader clsd bool // indicates a graceful close re, we os.Error // read/write errors - lastBody io.ReadCloser + lastbody io.ReadCloser nread, nwritten int + pipe textproto.Pipeline + pipereq map[*Request]uint lk sync.Mutex // protected read/write to re,we } @@ -37,7 +42,7 @@ func NewServerConn(c net.Conn, r *bufio.Reader) *ServerConn { if r == nil { r = bufio.NewReader(c) } - return &ServerConn{c: c, r: r} + return &ServerConn{c: c, r: r, pipereq: make(map[*Request]uint)} } // Close detaches the ServerConn and returns the underlying connection as well @@ -57,10 +62,25 @@ func (sc *ServerConn) Close() (c net.Conn, r *bufio.Reader) { // Read returns the next request on the wire. An ErrPersistEOF is returned if // it is gracefully determined that there are no more requests (e.g. after the // first request on an HTTP/1.0 connection, or after a Connection:close on a -// HTTP/1.1 connection). Read can be called concurrently with Write, but not -// with another Read. +// HTTP/1.1 connection). func (sc *ServerConn) Read() (req *Request, err os.Error) { + // Ensure ordered execution of Reads and Writes + id := sc.pipe.Next() + sc.pipe.StartRequest(id) + defer func() { + sc.pipe.EndRequest(id) + if req == nil { + sc.pipe.StartResponse(id) + sc.pipe.EndResponse(id) + } else { + // Remember the pipeline id of this request + sc.lk.Lock() + sc.pipereq[req] = id + sc.lk.Unlock() + } + }() + sc.lk.Lock() if sc.we != nil { // no point receiving if write-side broken or closed defer sc.lk.Unlock() @@ -73,12 +93,12 @@ func (sc *ServerConn) Read() (req *Request, err os.Error) { sc.lk.Unlock() // Make sure body is fully consumed, even if user does not call body.Close - if sc.lastBody != nil { + if sc.lastbody != nil { // body.Close is assumed to be idempotent and multiple calls to // it should return the error that its first invokation // returned. - err = sc.lastBody.Close() - sc.lastBody = nil + err = sc.lastbody.Close() + sc.lastbody = nil if err != nil { sc.lk.Lock() defer sc.lk.Unlock() @@ -102,7 +122,7 @@ func (sc *ServerConn) Read() (req *Request, err os.Error) { return } } - sc.lastBody = req.Body + sc.lastbody = req.Body sc.nread++ if req.Close { sc.lk.Lock() @@ -121,11 +141,24 @@ func (sc *ServerConn) Pending() int { return sc.nread - sc.nwritten } -// Write writes a repsonse. To close the connection gracefully, set the +// Write writes resp in response to req. To close the connection gracefully, set the // Response.Close field to true. Write should be considered operational until // it returns an error, regardless of any errors returned on the Read side. -// Write can be called concurrently with Read, but not with another Write. -func (sc *ServerConn) Write(resp *Response) os.Error { +func (sc *ServerConn) Write(req *Request, resp *Response) os.Error { + + // Retrieve the pipeline ID of this request/response pair + sc.lk.Lock() + id, ok := sc.pipereq[req] + sc.pipereq[req] = 0, false + if !ok { + sc.lk.Unlock() + return ErrPipeline + } + sc.lk.Unlock() + + // Ensure pipeline order + sc.pipe.StartResponse(id) + defer sc.pipe.EndResponse(id) sc.lk.Lock() if sc.we != nil { @@ -166,10 +199,11 @@ type ClientConn struct { c net.Conn r *bufio.Reader re, we os.Error // read/write errors - lastBody io.ReadCloser + lastbody io.ReadCloser nread, nwritten int - reqm list.List // request methods in order of execution - lk sync.Mutex // protects read/write to reqm,re,we + pipe textproto.Pipeline + pipereq map[*Request]uint + lk sync.Mutex // protects read/write to re,we,pipereq,etc. } // NewClientConn returns a new ClientConn reading and writing c. If r is not @@ -178,7 +212,7 @@ func NewClientConn(c net.Conn, r *bufio.Reader) *ClientConn { if r == nil { r = bufio.NewReader(c) } - return &ClientConn{c: c, r: r} + return &ClientConn{c: c, r: r, pipereq: make(map[*Request]uint)} } // Close detaches the ClientConn and returns the underlying connection as well @@ -191,7 +225,6 @@ func (cc *ClientConn) Close() (c net.Conn, r *bufio.Reader) { r = cc.r cc.c = nil cc.r = nil - cc.reqm.Init() cc.lk.Unlock() return } @@ -201,8 +234,23 @@ func (cc *ClientConn) Close() (c net.Conn, r *bufio.Reader) { // keepalive connection is logically closed after this request and the opposing // server is informed. An ErrUnexpectedEOF indicates the remote closed the // underlying TCP connection, which is usually considered as graceful close. -// Write can be called concurrently with Read, but not with another Write. -func (cc *ClientConn) Write(req *Request) os.Error { +func (cc *ClientConn) Write(req *Request) (err os.Error) { + + // Ensure ordered execution of Writes + id := cc.pipe.Next() + cc.pipe.StartRequest(id) + defer func() { + cc.pipe.EndRequest(id) + if err != nil { + cc.pipe.StartResponse(id) + cc.pipe.EndResponse(id) + } else { + // Remember the pipeline id of this request + cc.lk.Lock() + cc.pipereq[req] = id + cc.lk.Unlock() + } + }() cc.lk.Lock() if cc.re != nil { // no point sending if read-side closed or broken @@ -223,7 +271,7 @@ func (cc *ClientConn) Write(req *Request) os.Error { cc.lk.Unlock() } - err := req.Write(cc.c) + err = req.Write(cc.c) if err != nil { cc.lk.Lock() defer cc.lk.Unlock() @@ -231,9 +279,6 @@ func (cc *ClientConn) Write(req *Request) os.Error { return err } cc.nwritten++ - cc.lk.Lock() - cc.reqm.PushBack(req.Method) - cc.lk.Unlock() return nil } @@ -250,7 +295,21 @@ func (cc *ClientConn) Pending() int { // returned together with an ErrPersistEOF, which means that the remote // requested that this be the last request serviced. Read can be called // concurrently with Write, but not with another Read. -func (cc *ClientConn) Read() (resp *Response, err os.Error) { +func (cc *ClientConn) Read(req *Request) (resp *Response, err os.Error) { + + // Retrieve the pipeline ID of this request/response pair + cc.lk.Lock() + id, ok := cc.pipereq[req] + cc.pipereq[req] = 0, false + if !ok { + cc.lk.Unlock() + return nil, ErrPipeline + } + cc.lk.Unlock() + + // Ensure pipeline order + cc.pipe.StartResponse(id) + defer cc.pipe.EndResponse(id) cc.lk.Lock() if cc.re != nil { @@ -259,17 +318,13 @@ func (cc *ClientConn) Read() (resp *Response, err os.Error) { } cc.lk.Unlock() - if cc.nread >= cc.nwritten { - return nil, os.NewError("persist client pipe count") - } - // Make sure body is fully consumed, even if user does not call body.Close - if cc.lastBody != nil { + if cc.lastbody != nil { // body.Close is assumed to be idempotent and multiple calls to // it should return the error that its first invokation // returned. - err = cc.lastBody.Close() - cc.lastBody = nil + err = cc.lastbody.Close() + cc.lastbody = nil if err != nil { cc.lk.Lock() defer cc.lk.Unlock() @@ -278,18 +333,14 @@ func (cc *ClientConn) Read() (resp *Response, err os.Error) { } } - cc.lk.Lock() - m := cc.reqm.Front() - cc.reqm.Remove(m) - cc.lk.Unlock() - resp, err = ReadResponse(cc.r, m.Value.(string)) + resp, err = ReadResponse(cc.r, req.Method) if err != nil { cc.lk.Lock() defer cc.lk.Unlock() cc.re = err return } - cc.lastBody = resp.Body + cc.lastbody = resp.Body cc.nread++ @@ -301,3 +352,12 @@ func (cc *ClientConn) Read() (resp *Response, err os.Error) { } return } + +// Do is convenience method that writes a request and reads a response. +func (cc *ClientConn) Do(req *Request) (resp *Response, err os.Error) { + err = cc.Write(req) + if err != nil { + return + } + return cc.Read(req) +} diff --git a/src/pkg/http/serve_test.go b/src/pkg/http/serve_test.go index 80ad86290d..5594d512ad 100644 --- a/src/pkg/http/serve_test.go +++ b/src/pkg/http/serve_test.go @@ -192,7 +192,7 @@ func TestHostHandlers(t *testing.T) { t.Errorf("writing request: %v", err) continue } - r, err := cc.Read() + r, err := cc.Read(&req) if err != nil { t.Errorf("reading response: %v", err) continue