1
0
mirror of https://github.com/golang/go synced 2024-09-30 16:08:36 -06:00

internal/jsonrpc2: remove the OnReply callbacks

The same behavior can now be achieved by wrapping the Replier as we traverse the
handler stack instead.
Request is no longer mutated during handler invocation.

Change-Id: I2213de500f39e048f3f80161e234f3ae30464d70
Reviewed-on: https://go-review.googlesource.com/c/tools/+/227918
Run-TryBot: Ian Cottrell <iancottrell@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
Reviewed-by: Robert Findley <rfindley@google.com>
This commit is contained in:
Ian Cottrell 2020-04-10 10:56:00 -04:00
parent 6a72e3782c
commit ce2627f346
2 changed files with 21 additions and 31 deletions

View File

@ -31,8 +31,15 @@ func MethodNotFound(ctx context.Context, reply Replier, r *Request) error {
// not call Reply for every request that it is passed.
func MustReplyHandler(handler Handler) Handler {
return func(ctx context.Context, reply Replier, req *Request) error {
err := handler(ctx, reply, req)
if req.done != nil {
called := false
err := handler(ctx, func(ctx context.Context, result interface{}, err error) error {
if called {
panic(fmt.Errorf("request %q replied to more than once", req.Method))
}
called = true
return reply(ctx, result, err)
}, req)
if !called {
panic(fmt.Errorf("request %q was never replied to", req.Method))
}
return err
@ -51,11 +58,13 @@ func CancelHandler(handler Handler) (Handler, func(id ID)) {
mu.Lock()
handling[*req.ID] = cancel
mu.Unlock()
req.OnReply(func() {
innerReply := reply
reply = func(ctx context.Context, result interface{}, err error) error {
mu.Lock()
delete(handling, *req.ID)
mu.Unlock()
})
return innerReply(ctx, result, err)
}
}
return handler(ctx, reply, req)
}
@ -82,7 +91,11 @@ func AsyncHandler(handler Handler) Handler {
waitForPrevious := nextRequest
nextRequest = make(chan struct{})
unlockNext := nextRequest
req.OnReply(func() { close(unlockNext) })
innerReply := reply
reply = func(ctx context.Context, result interface{}, err error) error {
close(unlockNext)
return innerReply(ctx, result, err)
}
_, queueDone := event.StartSpan(ctx, "queued")
go func() {
<-waitForPrevious

View File

@ -39,10 +39,6 @@ type Conn struct {
// Request is sent to a server to represent a Call or Notify operaton.
type Request struct {
conn *Conn
// done holds set of callbacks added by OnReply, and is set back to nil if
// Reply has been called.
done []func()
// The Wire values of the request.
wireRequest
}
@ -170,18 +166,11 @@ func (r *Request) IsNotify() bool {
return r.ID == nil
}
func replier(r *Request) Replier {
func replier(r *Request, spanDone func()) Replier {
return func(ctx context.Context, result interface{}, err error) error {
if r.done == nil {
return fmt.Errorf("reply invoked more than once")
}
defer func() {
recordStatus(ctx, err)
for i := len(r.done); i > 0; i-- {
r.done[i-1]()
}
r.done = nil
spanDone()
}()
if r.IsNotify() {
@ -225,17 +214,6 @@ func replier(r *Request) Replier {
}
}
// OnReply adds a done callback to the request.
// All added callbacks are invoked during the one required call to Reply, and
// then dropped.
// It is an error to call this after Reply.
// This call is not safe for concurrent use, but should only be invoked by
// handlers and in general only one handler should be working on a request
// at any time.
func (r *Request) OnReply(do func()) {
r.done = append(r.done, do)
}
// Run blocks until the connection is terminated, and returns any error that
// caused the termination.
// It must be called exactly once for each Conn.
@ -278,9 +256,8 @@ func (c *Conn) Run(runCtx context.Context, handler Handler) error {
ID: msg.ID,
},
}
req.OnReply(func() { spanDone() })
if err := handler(reqCtx, replier(req), req); err != nil {
if err := handler(reqCtx, replier(req, spanDone), req); err != nil {
// delivery failed, not much we can do
event.Error(reqCtx, "jsonrpc2 message delivery failed", err)
}