mirror of
https://github.com/golang/go
synced 2024-11-19 02:34:44 -07:00
e93be7f42f
Change-Id: I6aa47fffcb29842e3194231e4ad4b6be4386d329 Reviewed-on: https://go-review.googlesource.com/136675 Reviewed-by: Rebecca Stambler <rstambler@golang.org>
342 lines
11 KiB
Go
342 lines
11 KiB
Go
// Copyright 2018 The Go Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
// Package jsonrpc2 is a minimal implementation of the JSON RPC 2 spec.
|
|
// https://www.jsonrpc.org/specification
|
|
// It is intended to be compatible with other implementations at the wire level.
|
|
package jsonrpc2
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
)
|
|
|
|
// Conn is a JSON RPC 2 client server connection.
|
|
// Conn is bidirectional; it does not have a designated server or client end.
|
|
type Conn struct {
|
|
handle Handler
|
|
cancel Canceler
|
|
log Logger
|
|
stream Stream
|
|
done chan struct{}
|
|
err error
|
|
seq int64 // must only be accessed using atomic operations
|
|
pendingMu sync.Mutex // protects the pending map
|
|
pending map[ID]chan *Response
|
|
handlingMu sync.Mutex // protects the handling map
|
|
handling map[ID]context.CancelFunc
|
|
}
|
|
|
|
// Handler is an option you can pass to NewConn to handle incoming requests.
|
|
// If the request returns true from IsNotify then the Handler should not return a
|
|
// result or error, otherwise it should handle the Request and return either
|
|
// an encoded result, or an error.
|
|
// Handlers must be concurrency-safe.
|
|
type Handler = func(context.Context, *Conn, *Request) (interface{}, *Error)
|
|
|
|
// Canceler is an option you can pass to NewConn which is invoked for
|
|
// cancelled outgoing requests.
|
|
// The request will have the ID filled in, which can be used to propagate the
|
|
// cancel to the other process if needed.
|
|
// It is okay to use the connection to send notifications, but the context will
|
|
// be in the cancelled state, so you must do it with the background context
|
|
// instead.
|
|
type Canceler = func(context.Context, *Conn, *Request)
|
|
|
|
// Logger is an option you can pass to NewConn which is invoked for
|
|
// all messages flowing through a Conn.
|
|
type Logger = func(mode string, id *ID, method string, payload *json.RawMessage, err *Error)
|
|
|
|
// NewErrorf builds a Error struct for the suppied message and code.
|
|
// If args is not empty, message and args will be passed to Sprintf.
|
|
func NewErrorf(code int64, format string, args ...interface{}) *Error {
|
|
return &Error{
|
|
Code: code,
|
|
Message: fmt.Sprintf(format, args...),
|
|
}
|
|
}
|
|
|
|
// NewConn creates a new connection object that reads and writes messages from
|
|
// the supplied stream and dispatches incoming messages to the supplied handler.
|
|
func NewConn(ctx context.Context, s Stream, options ...interface{}) *Conn {
|
|
conn := &Conn{
|
|
stream: s,
|
|
done: make(chan struct{}),
|
|
pending: make(map[ID]chan *Response),
|
|
handling: make(map[ID]context.CancelFunc),
|
|
}
|
|
for _, opt := range options {
|
|
switch opt := opt.(type) {
|
|
case Handler:
|
|
if conn.handle != nil {
|
|
panic("Duplicate Handler function in options list")
|
|
}
|
|
conn.handle = opt
|
|
case Canceler:
|
|
if conn.cancel != nil {
|
|
panic("Duplicate Canceler function in options list")
|
|
}
|
|
conn.cancel = opt
|
|
case Logger:
|
|
if conn.log != nil {
|
|
panic("Duplicate Logger function in options list")
|
|
}
|
|
conn.log = opt
|
|
default:
|
|
panic(fmt.Errorf("Unknown option type %T in options list", opt))
|
|
}
|
|
}
|
|
if conn.handle == nil {
|
|
// the default handler reports a method error
|
|
conn.handle = func(ctx context.Context, c *Conn, r *Request) (interface{}, *Error) {
|
|
return nil, NewErrorf(CodeMethodNotFound, "method %q not found", r.Method)
|
|
}
|
|
}
|
|
if conn.cancel == nil {
|
|
// the default canceller does nothing
|
|
conn.cancel = func(context.Context, *Conn, *Request) {}
|
|
}
|
|
if conn.log == nil {
|
|
// the default logger does nothing
|
|
conn.log = func(string, *ID, string, *json.RawMessage, *Error) {}
|
|
}
|
|
go func() {
|
|
conn.err = conn.run(ctx)
|
|
close(conn.done)
|
|
}()
|
|
return conn
|
|
}
|
|
|
|
// Wait blocks until the connection is terminated, and returns any error that
|
|
// cause the termination.
|
|
func (c *Conn) Wait(ctx context.Context) error {
|
|
select {
|
|
case <-c.done:
|
|
return c.err
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
// Cancel cancels a pending Call on the server side.
|
|
// The call is identified by its id.
|
|
// JSON RPC 2 does not specify a cancel message, so cancellation support is not
|
|
// directly wired in. This method allows a higher level protocol to choose how
|
|
// to propagate the cancel.
|
|
func (c *Conn) Cancel(id ID) {
|
|
c.handlingMu.Lock()
|
|
cancel := c.handling[id]
|
|
c.handlingMu.Unlock()
|
|
if cancel != nil {
|
|
cancel()
|
|
}
|
|
}
|
|
|
|
// Notify is called to send a notification request over the connection.
|
|
// It will return as soon as the notification has been sent, as no response is
|
|
// possible.
|
|
func (c *Conn) Notify(ctx context.Context, method string, params interface{}) error {
|
|
jsonParams, err := marshalToRaw(params)
|
|
if err != nil {
|
|
return fmt.Errorf("marshalling notify parameters: %v", err)
|
|
}
|
|
request := &Request{
|
|
Method: method,
|
|
Params: jsonParams,
|
|
}
|
|
data, err := json.Marshal(request)
|
|
if err != nil {
|
|
return fmt.Errorf("marshalling notify request: %v", err)
|
|
}
|
|
c.log("notify <=", nil, request.Method, request.Params, nil)
|
|
return c.stream.Write(ctx, data)
|
|
}
|
|
|
|
// Call sends a request over the connection and then waits for a response.
|
|
// If the response is not an error, it will be decoded into result.
|
|
// result must be of a type you an pass to json.Unmarshal.
|
|
func (c *Conn) Call(ctx context.Context, method string, params, result interface{}) error {
|
|
jsonParams, err := marshalToRaw(params)
|
|
if err != nil {
|
|
return fmt.Errorf("marshalling call parameters: %v", err)
|
|
}
|
|
// generate a new request identifier
|
|
id := ID{Number: atomic.AddInt64(&c.seq, 1)}
|
|
request := &Request{
|
|
ID: &id,
|
|
Method: method,
|
|
Params: jsonParams,
|
|
}
|
|
// marshal the request now it is complete
|
|
data, err := json.Marshal(request)
|
|
if err != nil {
|
|
return fmt.Errorf("marshalling call request: %v", err)
|
|
}
|
|
// we have to add ourselves to the pending map before we send, otherwise we
|
|
// are racing the response
|
|
rchan := make(chan *Response)
|
|
c.pendingMu.Lock()
|
|
c.pending[id] = rchan
|
|
c.pendingMu.Unlock()
|
|
defer func() {
|
|
// clean up the pending response handler on the way out
|
|
c.pendingMu.Lock()
|
|
delete(c.pending, id)
|
|
c.pendingMu.Unlock()
|
|
}()
|
|
// now we are ready to send
|
|
c.log("call <=", request.ID, request.Method, request.Params, nil)
|
|
if err := c.stream.Write(ctx, data); err != nil {
|
|
// sending failed, we will never get a response, so don't leave it pending
|
|
return err
|
|
}
|
|
// now wait for the response
|
|
select {
|
|
case response := <-rchan:
|
|
// is it an error response?
|
|
if response.Error != nil {
|
|
return response.Error
|
|
}
|
|
if result == nil || response.Result == nil {
|
|
return nil
|
|
}
|
|
if err := json.Unmarshal(*response.Result, result); err != nil {
|
|
return fmt.Errorf("unmarshalling result: %v", err)
|
|
}
|
|
return nil
|
|
case <-ctx.Done():
|
|
// allow the handler to propagate the cancel
|
|
c.cancel(ctx, c, request)
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
// combined has all the fields of both Request and Response.
|
|
// We can decode this and then work out which it is.
|
|
type combined struct {
|
|
VersionTag VersionTag `json:"jsonrpc"`
|
|
ID *ID `json:"id,omitempty"`
|
|
Method string `json:"method"`
|
|
Params *json.RawMessage `json:"params,omitempty"`
|
|
Result *json.RawMessage `json:"result,omitempty"`
|
|
Error *Error `json:"error,omitempty"`
|
|
}
|
|
|
|
// Run starts a read loop on the supplied reader.
|
|
// It must be called exactly once for each Conn.
|
|
// It returns only when the reader is closed or there is an error in the stream.
|
|
func (c *Conn) run(ctx context.Context) error {
|
|
ctx, cancelRun := context.WithCancel(ctx)
|
|
for {
|
|
// get the data for a message
|
|
data, err := c.stream.Read(ctx)
|
|
if err != nil {
|
|
// the stream failed, we cannot continue
|
|
return err
|
|
}
|
|
// read a combined message
|
|
msg := &combined{}
|
|
if err := json.Unmarshal(data, msg); err != nil {
|
|
// a badly formed message arrived, log it and continue
|
|
// we trust the stream to have isolated the error to just this message
|
|
c.log("read", nil, "", nil, NewErrorf(0, "unmarshal failed: %v", err))
|
|
continue
|
|
}
|
|
// work out which kind of message we have
|
|
switch {
|
|
case msg.Method != "":
|
|
// if method is set it must be a request
|
|
request := &Request{
|
|
Method: msg.Method,
|
|
Params: msg.Params,
|
|
ID: msg.ID,
|
|
}
|
|
if request.IsNotify() {
|
|
c.log("notify =>", request.ID, request.Method, request.Params, nil)
|
|
// we have a Notify, forward to the handler in a go routine
|
|
go func() {
|
|
if _, err := c.handle(ctx, c, request); err != nil {
|
|
// notify produced an error, we can't forward it to the other side
|
|
// because there is no id, so we just log it
|
|
c.log("notify failed", nil, request.Method, nil, err)
|
|
}
|
|
}()
|
|
} else {
|
|
// we have a Call, forward to the handler in another go routine
|
|
reqCtx, cancelReq := context.WithCancel(ctx)
|
|
c.handlingMu.Lock()
|
|
c.handling[*request.ID] = cancelReq
|
|
c.handlingMu.Unlock()
|
|
go func() {
|
|
defer func() {
|
|
// clean up the cancel handler on the way out
|
|
c.handlingMu.Lock()
|
|
delete(c.handling, *request.ID)
|
|
c.handlingMu.Unlock()
|
|
cancelReq()
|
|
}()
|
|
c.log("call =>", request.ID, request.Method, request.Params, nil)
|
|
resp, callErr := c.handle(reqCtx, c, request)
|
|
var result *json.RawMessage
|
|
if result, err = marshalToRaw(resp); err != nil {
|
|
callErr = &Error{Message: err.Error()}
|
|
}
|
|
response := &Response{
|
|
Result: result,
|
|
Error: callErr,
|
|
ID: request.ID,
|
|
}
|
|
data, err := json.Marshal(response)
|
|
if err != nil {
|
|
// failure to marshal leaves the call without a response
|
|
// possibly we could attempt to respond with a different message
|
|
// but we can probably rely on timeouts instead
|
|
c.log("respond =!>", request.ID, request.Method, nil, NewErrorf(0, "%s", err))
|
|
return
|
|
}
|
|
c.log("respond =>", response.ID, "", response.Result, response.Error)
|
|
if err = c.stream.Write(ctx, data); err != nil {
|
|
// if a stream write fails, we really need to shut down the whole
|
|
// stream and return from the run
|
|
c.log("respond =!>", nil, request.Method, nil, NewErrorf(0, "%s", err))
|
|
cancelRun()
|
|
return
|
|
}
|
|
}()
|
|
}
|
|
case msg.ID != nil:
|
|
// we have a response, get the pending entry from the map
|
|
c.pendingMu.Lock()
|
|
rchan := c.pending[*msg.ID]
|
|
if rchan != nil {
|
|
delete(c.pending, *msg.ID)
|
|
}
|
|
c.pendingMu.Unlock()
|
|
// and send the reply to the channel
|
|
response := &Response{
|
|
Result: msg.Result,
|
|
Error: msg.Error,
|
|
ID: msg.ID,
|
|
}
|
|
c.log("response =>", response.ID, "", response.Result, response.Error)
|
|
rchan <- response
|
|
close(rchan)
|
|
default:
|
|
c.log("invalid =>", nil, "", nil, NewErrorf(0, "message not a call, notify or response, ignoring"))
|
|
}
|
|
}
|
|
}
|
|
|
|
func marshalToRaw(obj interface{}) (*json.RawMessage, error) {
|
|
data, err := json.Marshal(obj)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
raw := json.RawMessage(data)
|
|
return &raw, nil
|
|
}
|