// Copyright 2018 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. // Package jsonrpc2 is a minimal implementation of the JSON RPC 2 spec. // https://www.jsonrpc.org/specification // It is intended to be compatible with other implementations at the wire level. package jsonrpc2 import ( "context" "encoding/json" "fmt" "sync" "sync/atomic" ) // Conn is a JSON RPC 2 client server connection. // Conn is bidirectional; it does not have a designated server or client end. type Conn struct { handle Handler cancel Canceler log Logger stream Stream done chan struct{} err error seq int64 // must only be accessed using atomic operations pendingMu sync.Mutex // protects the pending map pending map[ID]chan *Response handlingMu sync.Mutex // protects the handling map handling map[ID]context.CancelFunc } // Handler is an option you can pass to NewConn to handle incoming requests. // If the request returns true from IsNotify then the Handler should not return a // result or error, otherwise it should handle the Request and return either // an encoded result, or an error. // Handlers must be concurrency-safe. type Handler = func(context.Context, *Conn, *Request) (interface{}, *Error) // Canceler is an option you can pass to NewConn which is invoked for // cancelled outgoing requests. // The request will have the ID filled in, which can be used to propagate the // cancel to the other process if needed. // It is okay to use the connection to send notifications, but the context will // be in the cancelled state, so you must do it with the background context // instead. type Canceler = func(context.Context, *Conn, *Request) // Logger is an option you can pass to NewConn which is invoked for // all messages flowing through a Conn. type Logger = func(mode string, id *ID, method string, payload *json.RawMessage, err *Error) // NewErrorf builds a Error struct for the suppied message and code. // If args is not empty, message and args will be passed to Sprintf. func NewErrorf(code int64, format string, args ...interface{}) *Error { return &Error{ Code: code, Message: fmt.Sprintf(format, args...), } } // NewConn creates a new connection object that reads and writes messages from // the supplied stream and dispatches incoming messages to the supplied handler. func NewConn(ctx context.Context, s Stream, options ...interface{}) *Conn { conn := &Conn{ stream: s, done: make(chan struct{}), pending: make(map[ID]chan *Response), handling: make(map[ID]context.CancelFunc), } for _, opt := range options { switch opt := opt.(type) { case Handler: if conn.handle != nil { panic("Duplicate Handler function in options list") } conn.handle = opt case Canceler: if conn.cancel != nil { panic("Duplicate Canceler function in options list") } conn.cancel = opt case Logger: if conn.log != nil { panic("Duplicate Logger function in options list") } conn.log = opt default: panic(fmt.Errorf("Unknown option type %T in options list", opt)) } } if conn.handle == nil { // the default handler reports a method error conn.handle = func(ctx context.Context, c *Conn, r *Request) (interface{}, *Error) { return nil, NewErrorf(CodeMethodNotFound, "method %q not found", r.Method) } } if conn.cancel == nil { // the default canceller does nothing conn.cancel = func(context.Context, *Conn, *Request) {} } if conn.log == nil { // the default logger does nothing conn.log = func(string, *ID, string, *json.RawMessage, *Error) {} } go func() { conn.err = conn.run(ctx) close(conn.done) }() return conn } // Wait blocks until the connection is terminated, and returns any error that // cause the termination. func (c *Conn) Wait(ctx context.Context) error { select { case <-c.done: return c.err case <-ctx.Done(): return ctx.Err() } } // Cancel cancels a pending Call on the server side. // The call is identified by its id. // JSON RPC 2 does not specify a cancel message, so cancellation support is not // directly wired in. This method allows a higher level protocol to choose how // to propagate the cancel. func (c *Conn) Cancel(id ID) { c.handlingMu.Lock() cancel := c.handling[id] c.handlingMu.Unlock() if cancel != nil { cancel() } } // Notify is called to send a notification request over the connection. // It will return as soon as the notification has been sent, as no response is // possible. func (c *Conn) Notify(ctx context.Context, method string, params interface{}) error { jsonParams, err := marshalToRaw(params) if err != nil { return fmt.Errorf("marshalling notify parameters: %v", err) } request := &Request{ Method: method, Params: jsonParams, } data, err := json.Marshal(request) if err != nil { return fmt.Errorf("marshalling notify request: %v", err) } c.log("notify <=", nil, request.Method, request.Params, nil) return c.stream.Write(ctx, data) } // Call sends a request over the connection and then waits for a response. // If the response is not an error, it will be decoded into result. // result must be of a type you an pass to json.Unmarshal. func (c *Conn) Call(ctx context.Context, method string, params, result interface{}) error { jsonParams, err := marshalToRaw(params) if err != nil { return fmt.Errorf("marshalling call parameters: %v", err) } // generate a new request identifier id := ID{Number: atomic.AddInt64(&c.seq, 1)} request := &Request{ ID: &id, Method: method, Params: jsonParams, } // marshal the request now it is complete data, err := json.Marshal(request) if err != nil { return fmt.Errorf("marshalling call request: %v", err) } // we have to add ourselves to the pending map before we send, otherwise we // are racing the response rchan := make(chan *Response) c.pendingMu.Lock() c.pending[id] = rchan c.pendingMu.Unlock() defer func() { // clean up the pending response handler on the way out c.pendingMu.Lock() delete(c.pending, id) c.pendingMu.Unlock() }() // now we are ready to send c.log("call <=", request.ID, request.Method, request.Params, nil) if err := c.stream.Write(ctx, data); err != nil { // sending failed, we will never get a response, so don't leave it pending return err } // now wait for the response select { case response := <-rchan: // is it an error response? if response.Error != nil { return response.Error } if result == nil || response.Result == nil { return nil } if err := json.Unmarshal(*response.Result, result); err != nil { return fmt.Errorf("unmarshalling result: %v", err) } return nil case <-ctx.Done(): // allow the handler to propagate the cancel c.cancel(ctx, c, request) return ctx.Err() } } // combined has all the fields of both Request and Response. // We can decode this and then work out which it is. type combined struct { VersionTag VersionTag `json:"jsonrpc"` ID *ID `json:"id,omitempty"` Method string `json:"method"` Params *json.RawMessage `json:"params,omitempty"` Result *json.RawMessage `json:"result,omitempty"` Error *Error `json:"error,omitempty"` } // Run starts a read loop on the supplied reader. // It must be called exactly once for each Conn. // It returns only when the reader is closed or there is an error in the stream. func (c *Conn) run(ctx context.Context) error { ctx, cancelRun := context.WithCancel(ctx) for { // get the data for a message data, err := c.stream.Read(ctx) if err != nil { // the stream failed, we cannot continue return err } // read a combined message msg := &combined{} if err := json.Unmarshal(data, msg); err != nil { // a badly formed message arrived, log it and continue // we trust the stream to have isolated the error to just this message c.log("read", nil, "", nil, NewErrorf(0, "unmarshal failed: %v", err)) continue } // work out which kind of message we have switch { case msg.Method != "": // if method is set it must be a request request := &Request{ Method: msg.Method, Params: msg.Params, ID: msg.ID, } if request.IsNotify() { c.log("notify =>", request.ID, request.Method, request.Params, nil) // we have a Notify, forward to the handler in a go routine go func() { if _, err := c.handle(ctx, c, request); err != nil { // notify produced an error, we can't forward it to the other side // because there is no id, so we just log it c.log("notify failed", nil, request.Method, nil, err) } }() } else { // we have a Call, forward to the handler in another go routine reqCtx, cancelReq := context.WithCancel(ctx) c.handlingMu.Lock() c.handling[*request.ID] = cancelReq c.handlingMu.Unlock() go func() { defer func() { // clean up the cancel handler on the way out c.handlingMu.Lock() delete(c.handling, *request.ID) c.handlingMu.Unlock() cancelReq() }() c.log("call =>", request.ID, request.Method, request.Params, nil) resp, callErr := c.handle(reqCtx, c, request) var result *json.RawMessage if result, err = marshalToRaw(resp); err != nil { callErr = &Error{Message: err.Error()} } response := &Response{ Result: result, Error: callErr, ID: request.ID, } data, err := json.Marshal(response) if err != nil { // failure to marshal leaves the call without a response // possibly we could attempt to respond with a different message // but we can probably rely on timeouts instead c.log("respond =!>", request.ID, request.Method, nil, NewErrorf(0, "%s", err)) return } c.log("respond =>", response.ID, "", response.Result, response.Error) if err = c.stream.Write(ctx, data); err != nil { // if a stream write fails, we really need to shut down the whole // stream and return from the run c.log("respond =!>", nil, request.Method, nil, NewErrorf(0, "%s", err)) cancelRun() return } }() } case msg.ID != nil: // we have a response, get the pending entry from the map c.pendingMu.Lock() rchan := c.pending[*msg.ID] if rchan != nil { delete(c.pending, *msg.ID) } c.pendingMu.Unlock() // and send the reply to the channel response := &Response{ Result: msg.Result, Error: msg.Error, ID: msg.ID, } c.log("response =>", response.ID, "", response.Result, response.Error) rchan <- response close(rchan) default: c.log("invalid =>", nil, "", nil, NewErrorf(0, "message not a call, notify or response, ignoring")) } } } func marshalToRaw(obj interface{}) (*json.RawMessage, error) { data, err := json.Marshal(obj) if err != nil { return nil, err } raw := json.RawMessage(data) return &raw, nil }