diff --git a/internal/jsonrpc2/jsonrpc2.go b/internal/jsonrpc2/jsonrpc2.go index cdbc3dce7c5..f1c1a1f3c92 100644 --- a/internal/jsonrpc2/jsonrpc2.go +++ b/internal/jsonrpc2/jsonrpc2.go @@ -300,6 +300,8 @@ func (r *Request) Parallel() { // You must call this exactly once for any given request. // It should only be called from the handler go routine. // If err is set then result will be ignored. +// If the request has not yet dropped into parallel mode +// it will be before this function returns. func (r *Request) Reply(ctx context.Context, result interface{}, err error) error { if r.state >= requestReplied { return fmt.Errorf("reply invoked more than once") @@ -310,6 +312,10 @@ func (r *Request) Reply(ctx context.Context, result interface{}, err error) erro ctx, st := trace.StartSpan(ctx, r.Method+":reply", trace.WithSpanKind(trace.SpanKindClient)) defer st.End() + // reply ends the handling phase of a call, so if we are not yet + // parallel we should be now. The go routine is allowed to continue + // to do work after replying, which is why it is important to unlock + // the rpc system at this point. r.Parallel() r.state = requestReplied @@ -380,6 +386,9 @@ type combined struct { // It must be called exactly once for each Conn. // It returns only when the reader is closed or there is an error in the stream. func (c *Conn) Run(ctx context.Context) error { + // we need to make the next request "lock" in an unlocked state to allow + // the first incoming request to proceed. All later requests are unlocked + // by the preceding request going to parallel mode. nextRequest := make(chan struct{}) close(nextRequest) for {