mirror of
https://github.com/golang/go
synced 2024-11-18 20:04:52 -07:00
94339b8328
Delivering each message in a go routine turned out to be problematic, there are some messages that must be fully processed before later messages are started, and there was no way to guarantee that. We now push concurrence handling up to the higher level, this has the disadvantage of not being able to guarantee we respond to call messages correctly, but its a small price to pay. The LSP currently processes each message fully in order blocking the handler, while we still work on basic functionality. Change-Id: If0648c77713ddbe4fed69da97a57696f433b8002 Reviewed-on: https://go-review.googlesource.com/c/149497 Run-TryBot: Ian Cottrell <iancottrell@google.com> TryBot-Result: Gobot Gobot <gobot@golang.org> Reviewed-by: Rebecca Stambler <rstambler@golang.org>
359 lines
11 KiB
Go
359 lines
11 KiB
Go
// Copyright 2018 The Go Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
// Package jsonrpc2 is a minimal implementation of the JSON RPC 2 spec.
|
|
// https://www.jsonrpc.org/specification
|
|
// It is intended to be compatible with other implementations at the wire level.
|
|
package jsonrpc2
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// Conn is a JSON RPC 2 client server connection.
|
|
// Conn is bidirectional; it does not have a designated server or client end.
|
|
type Conn struct {
|
|
handle Handler
|
|
cancel Canceler
|
|
log Logger
|
|
stream Stream
|
|
done chan struct{}
|
|
err error
|
|
seq int64 // must only be accessed using atomic operations
|
|
pendingMu sync.Mutex // protects the pending map
|
|
pending map[ID]chan *Response
|
|
handlingMu sync.Mutex // protects the handling map
|
|
handling map[ID]handling
|
|
}
|
|
|
|
// Handler is an option you can pass to NewConn to handle incoming requests.
|
|
// If the request returns false from IsNotify then the Handler must eventually
|
|
// call Reply on the Conn with the supplied request.
|
|
// Handlers are called synchronously, they should pass the work off to a go
|
|
// routine if they are going to take a long time.
|
|
type Handler func(context.Context, *Conn, *Request)
|
|
|
|
// Canceler is an option you can pass to NewConn which is invoked for
|
|
// cancelled outgoing requests.
|
|
// The request will have the ID filled in, which can be used to propagate the
|
|
// cancel to the other process if needed.
|
|
// It is okay to use the connection to send notifications, but the context will
|
|
// be in the cancelled state, so you must do it with the background context
|
|
// instead.
|
|
type Canceler func(context.Context, *Conn, *Request)
|
|
|
|
// NewErrorf builds a Error struct for the suppied message and code.
|
|
// If args is not empty, message and args will be passed to Sprintf.
|
|
func NewErrorf(code int64, format string, args ...interface{}) *Error {
|
|
return &Error{
|
|
Code: code,
|
|
Message: fmt.Sprintf(format, args...),
|
|
}
|
|
}
|
|
|
|
// NewConn creates a new connection object that reads and writes messages from
|
|
// the supplied stream and dispatches incoming messages to the supplied handler.
|
|
func NewConn(ctx context.Context, s Stream, options ...interface{}) *Conn {
|
|
conn := &Conn{
|
|
stream: s,
|
|
done: make(chan struct{}),
|
|
pending: make(map[ID]chan *Response),
|
|
handling: make(map[ID]handling),
|
|
}
|
|
for _, opt := range options {
|
|
switch opt := opt.(type) {
|
|
case Handler:
|
|
if conn.handle != nil {
|
|
panic("Duplicate Handler function in options list")
|
|
}
|
|
conn.handle = opt
|
|
case Canceler:
|
|
if conn.cancel != nil {
|
|
panic("Duplicate Canceler function in options list")
|
|
}
|
|
conn.cancel = opt
|
|
case Logger:
|
|
if conn.log != nil {
|
|
panic("Duplicate Logger function in options list")
|
|
}
|
|
conn.log = opt
|
|
default:
|
|
panic(fmt.Errorf("Unknown option type %T in options list", opt))
|
|
}
|
|
}
|
|
if conn.handle == nil {
|
|
// the default handler reports a method error
|
|
conn.handle = func(ctx context.Context, c *Conn, r *Request) {
|
|
if r.IsNotify() {
|
|
c.Reply(ctx, r, nil, NewErrorf(CodeMethodNotFound, "method %q not found", r.Method))
|
|
}
|
|
}
|
|
}
|
|
if conn.cancel == nil {
|
|
// the default canceller does nothing
|
|
conn.cancel = func(context.Context, *Conn, *Request) {}
|
|
}
|
|
if conn.log == nil {
|
|
// the default logger does nothing
|
|
conn.log = func(Direction, *ID, time.Duration, string, *json.RawMessage, *Error) {}
|
|
}
|
|
go func() {
|
|
conn.err = conn.run(ctx)
|
|
close(conn.done)
|
|
}()
|
|
return conn
|
|
}
|
|
|
|
// Wait blocks until the connection is terminated, and returns any error that
|
|
// cause the termination.
|
|
func (c *Conn) Wait(ctx context.Context) error {
|
|
select {
|
|
case <-c.done:
|
|
return c.err
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
// Cancel cancels a pending Call on the server side.
|
|
// The call is identified by its id.
|
|
// JSON RPC 2 does not specify a cancel message, so cancellation support is not
|
|
// directly wired in. This method allows a higher level protocol to choose how
|
|
// to propagate the cancel.
|
|
func (c *Conn) Cancel(id ID) {
|
|
c.handlingMu.Lock()
|
|
handling, found := c.handling[id]
|
|
c.handlingMu.Unlock()
|
|
if found {
|
|
handling.cancel()
|
|
}
|
|
}
|
|
|
|
// Notify is called to send a notification request over the connection.
|
|
// It will return as soon as the notification has been sent, as no response is
|
|
// possible.
|
|
func (c *Conn) Notify(ctx context.Context, method string, params interface{}) error {
|
|
jsonParams, err := marshalToRaw(params)
|
|
if err != nil {
|
|
return fmt.Errorf("marshalling notify parameters: %v", err)
|
|
}
|
|
request := &Request{
|
|
Method: method,
|
|
Params: jsonParams,
|
|
}
|
|
data, err := json.Marshal(request)
|
|
if err != nil {
|
|
return fmt.Errorf("marshalling notify request: %v", err)
|
|
}
|
|
c.log(Send, nil, -1, request.Method, request.Params, nil)
|
|
return c.stream.Write(ctx, data)
|
|
}
|
|
|
|
// Call sends a request over the connection and then waits for a response.
|
|
// If the response is not an error, it will be decoded into result.
|
|
// result must be of a type you an pass to json.Unmarshal.
|
|
func (c *Conn) Call(ctx context.Context, method string, params, result interface{}) error {
|
|
jsonParams, err := marshalToRaw(params)
|
|
if err != nil {
|
|
return fmt.Errorf("marshalling call parameters: %v", err)
|
|
}
|
|
// generate a new request identifier
|
|
id := ID{Number: atomic.AddInt64(&c.seq, 1)}
|
|
request := &Request{
|
|
ID: &id,
|
|
Method: method,
|
|
Params: jsonParams,
|
|
}
|
|
// marshal the request now it is complete
|
|
data, err := json.Marshal(request)
|
|
if err != nil {
|
|
return fmt.Errorf("marshalling call request: %v", err)
|
|
}
|
|
// we have to add ourselves to the pending map before we send, otherwise we
|
|
// are racing the response
|
|
rchan := make(chan *Response)
|
|
c.pendingMu.Lock()
|
|
c.pending[id] = rchan
|
|
c.pendingMu.Unlock()
|
|
defer func() {
|
|
// clean up the pending response handler on the way out
|
|
c.pendingMu.Lock()
|
|
delete(c.pending, id)
|
|
c.pendingMu.Unlock()
|
|
}()
|
|
// now we are ready to send
|
|
before := time.Now()
|
|
c.log(Send, request.ID, -1, request.Method, request.Params, nil)
|
|
if err := c.stream.Write(ctx, data); err != nil {
|
|
// sending failed, we will never get a response, so don't leave it pending
|
|
return err
|
|
}
|
|
// now wait for the response
|
|
select {
|
|
case response := <-rchan:
|
|
elapsed := time.Since(before)
|
|
c.log(Send, response.ID, elapsed, request.Method, response.Result, response.Error)
|
|
// is it an error response?
|
|
if response.Error != nil {
|
|
return response.Error
|
|
}
|
|
if result == nil || response.Result == nil {
|
|
return nil
|
|
}
|
|
if err := json.Unmarshal(*response.Result, result); err != nil {
|
|
return fmt.Errorf("unmarshalling result: %v", err)
|
|
}
|
|
return nil
|
|
case <-ctx.Done():
|
|
// allow the handler to propagate the cancel
|
|
c.cancel(ctx, c, request)
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
// Reply sends a reply to the given request.
|
|
// It is an error to call this if request was not a call.
|
|
// You must call this exactly once for any given request.
|
|
// If err is set then result will be ignored.
|
|
func (c *Conn) Reply(ctx context.Context, req *Request, result interface{}, err error) error {
|
|
if req.IsNotify() {
|
|
return fmt.Errorf("reply not invoked with a valid call")
|
|
}
|
|
c.handlingMu.Lock()
|
|
handling, found := c.handling[*req.ID]
|
|
if found {
|
|
delete(c.handling, *req.ID)
|
|
}
|
|
c.handlingMu.Unlock()
|
|
if !found {
|
|
return fmt.Errorf("not a call in progress: %v", req.ID)
|
|
}
|
|
|
|
elapsed := time.Since(handling.start)
|
|
var raw *json.RawMessage
|
|
if err == nil {
|
|
raw, err = marshalToRaw(result)
|
|
}
|
|
response := &Response{
|
|
Result: raw,
|
|
ID: req.ID,
|
|
}
|
|
if err != nil {
|
|
if callErr, ok := err.(*Error); ok {
|
|
response.Error = callErr
|
|
} else {
|
|
response.Error = NewErrorf(0, "%s", err)
|
|
}
|
|
}
|
|
data, err := json.Marshal(response)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.log(Send, response.ID, elapsed, req.Method, response.Result, response.Error)
|
|
if err = c.stream.Write(ctx, data); err != nil {
|
|
// TODO(iancottrell): if a stream write fails, we really need to shut down
|
|
// the whole stream
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type handling struct {
|
|
request *Request
|
|
cancel context.CancelFunc
|
|
start time.Time
|
|
}
|
|
|
|
// combined has all the fields of both Request and Response.
|
|
// We can decode this and then work out which it is.
|
|
type combined struct {
|
|
VersionTag VersionTag `json:"jsonrpc"`
|
|
ID *ID `json:"id,omitempty"`
|
|
Method string `json:"method"`
|
|
Params *json.RawMessage `json:"params,omitempty"`
|
|
Result *json.RawMessage `json:"result,omitempty"`
|
|
Error *Error `json:"error,omitempty"`
|
|
}
|
|
|
|
// Run starts a read loop on the supplied reader.
|
|
// It must be called exactly once for each Conn.
|
|
// It returns only when the reader is closed or there is an error in the stream.
|
|
func (c *Conn) run(ctx context.Context) error {
|
|
for {
|
|
// get the data for a message
|
|
data, err := c.stream.Read(ctx)
|
|
if err != nil {
|
|
// the stream failed, we cannot continue
|
|
return err
|
|
}
|
|
// read a combined message
|
|
msg := &combined{}
|
|
if err := json.Unmarshal(data, msg); err != nil {
|
|
// a badly formed message arrived, log it and continue
|
|
// we trust the stream to have isolated the error to just this message
|
|
c.log(Receive, nil, -1, "", nil, NewErrorf(0, "unmarshal failed: %v", err))
|
|
continue
|
|
}
|
|
// work out which kind of message we have
|
|
switch {
|
|
case msg.Method != "":
|
|
// if method is set it must be a request
|
|
request := &Request{
|
|
Method: msg.Method,
|
|
Params: msg.Params,
|
|
ID: msg.ID,
|
|
}
|
|
if request.IsNotify() {
|
|
c.log(Receive, request.ID, -1, request.Method, request.Params, nil)
|
|
// we have a Notify, forward to the handler in a go routine
|
|
c.handle(ctx, c, request)
|
|
} else {
|
|
// we have a Call, forward to the handler in another go routine
|
|
reqCtx, cancelReq := context.WithCancel(ctx)
|
|
c.handlingMu.Lock()
|
|
c.handling[*request.ID] = handling{
|
|
request: request,
|
|
cancel: cancelReq,
|
|
start: time.Now(),
|
|
}
|
|
c.handlingMu.Unlock()
|
|
c.log(Receive, request.ID, -1, request.Method, request.Params, nil)
|
|
c.handle(reqCtx, c, request)
|
|
}
|
|
case msg.ID != nil:
|
|
// we have a response, get the pending entry from the map
|
|
c.pendingMu.Lock()
|
|
rchan := c.pending[*msg.ID]
|
|
if rchan != nil {
|
|
delete(c.pending, *msg.ID)
|
|
}
|
|
c.pendingMu.Unlock()
|
|
// and send the reply to the channel
|
|
response := &Response{
|
|
Result: msg.Result,
|
|
Error: msg.Error,
|
|
ID: msg.ID,
|
|
}
|
|
rchan <- response
|
|
close(rchan)
|
|
default:
|
|
c.log(Receive, nil, -1, "", nil, NewErrorf(0, "message not a call, notify or response, ignoring"))
|
|
}
|
|
}
|
|
}
|
|
|
|
func marshalToRaw(obj interface{}) (*json.RawMessage, error) {
|
|
data, err := json.Marshal(obj)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
raw := json.RawMessage(data)
|
|
return &raw, nil
|
|
}
|