mirror of
https://github.com/golang/go
synced 2024-10-01 01:48:32 -06:00
434f7a8fef
The previous implementation was exposing the details of the wire format and resulted in non idomatic go, detecting the presence of absence of values in fields to deterimine the message type. Now the messages are distinct types and we use type switches instead. Request still exists as an interface to expose the shared behaviour of Call and Notification, as this is the type accepted by handlers. The set of messages is deliberately closed by using a private methods on the interfaces. Change-Id: I2cf15ee3923ef4688670c62896f81f760c77fe04 Reviewed-on: https://go-review.googlesource.com/c/tools/+/228719 Run-TryBot: Ian Cottrell <iancottrell@google.com> TryBot-Result: Gobot Gobot <gobot@golang.org> Reviewed-by: Robert Findley <rfindley@google.com>
239 lines
6.9 KiB
Go
239 lines
6.9 KiB
Go
// Copyright 2018 The Go Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
// Package jsonrpc2 is a minimal implementation of the JSON RPC 2 spec.
|
|
// https://www.jsonrpc.org/specification
|
|
// It is intended to be compatible with other implementations at the wire level.
|
|
package jsonrpc2
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"golang.org/x/tools/internal/lsp/debug/tag"
|
|
"golang.org/x/tools/internal/telemetry/event"
|
|
)
|
|
|
|
const (
|
|
// ErrIdleTimeout is returned when serving timed out waiting for new connections.
|
|
ErrIdleTimeout = constError("timed out waiting for new connections")
|
|
|
|
// ErrDisconnected signals that the stream or connection exited normally.
|
|
ErrDisconnected = constError("disconnected")
|
|
)
|
|
|
|
// Conn is a JSON RPC 2 client server connection.
|
|
// Conn is bidirectional; it does not have a designated server or client end.
|
|
type Conn struct {
|
|
seq int64 // must only be accessed using atomic operations
|
|
stream Stream
|
|
pendingMu sync.Mutex // protects the pending map
|
|
pending map[ID]chan *Response
|
|
}
|
|
|
|
type constError string
|
|
|
|
func (e constError) Error() string { return string(e) }
|
|
|
|
// NewConn creates a new connection object around the supplied stream.
|
|
// You must call Run for the connection to be active.
|
|
func NewConn(s Stream) *Conn {
|
|
conn := &Conn{
|
|
stream: s,
|
|
pending: make(map[ID]chan *Response),
|
|
}
|
|
return conn
|
|
}
|
|
|
|
// Notify is called to send a notification request over the connection.
|
|
// It will return as soon as the notification has been sent, as no response is
|
|
// possible.
|
|
func (c *Conn) Notify(ctx context.Context, method string, params interface{}) (err error) {
|
|
notify, err := NewNotification(method, params)
|
|
if err != nil {
|
|
return fmt.Errorf("marshaling notify parameters: %v", err)
|
|
}
|
|
data, err := json.Marshal(notify)
|
|
if err != nil {
|
|
return fmt.Errorf("marshaling notify request: %v", err)
|
|
}
|
|
ctx, done := event.StartSpan(ctx, method,
|
|
tag.Method.Of(method),
|
|
tag.RPCDirection.Of(tag.Outbound),
|
|
)
|
|
defer func() {
|
|
recordStatus(ctx, err)
|
|
done()
|
|
}()
|
|
|
|
event.Record(ctx, tag.Started.Of(1))
|
|
n, err := c.stream.Write(ctx, data)
|
|
event.Record(ctx, tag.SentBytes.Of(n))
|
|
return err
|
|
}
|
|
|
|
// Call sends a request over the connection and then waits for a response.
|
|
// If the response is not an error, it will be decoded into result.
|
|
// result must be of a type you an pass to json.Unmarshal.
|
|
func (c *Conn) Call(ctx context.Context, method string, params, result interface{}) (_ ID, err error) {
|
|
// generate a new request identifier
|
|
id := ID{number: atomic.AddInt64(&c.seq, 1)}
|
|
call, err := NewCall(id, method, params)
|
|
if err != nil {
|
|
return id, fmt.Errorf("marshaling call parameters: %v", err)
|
|
}
|
|
// marshal the request now it is complete
|
|
data, err := json.Marshal(call)
|
|
if err != nil {
|
|
return id, fmt.Errorf("marshaling call request: %v", err)
|
|
}
|
|
ctx, done := event.StartSpan(ctx, method,
|
|
tag.Method.Of(method),
|
|
tag.RPCDirection.Of(tag.Outbound),
|
|
tag.RPCID.Of(fmt.Sprintf("%q", id)),
|
|
)
|
|
defer func() {
|
|
recordStatus(ctx, err)
|
|
done()
|
|
}()
|
|
event.Record(ctx, tag.Started.Of(1))
|
|
// We have to add ourselves to the pending map before we send, otherwise we
|
|
// are racing the response. Also add a buffer to rchan, so that if we get a
|
|
// wire response between the time this call is cancelled and id is deleted
|
|
// from c.pending, the send to rchan will not block.
|
|
rchan := make(chan *Response, 1)
|
|
c.pendingMu.Lock()
|
|
c.pending[id] = rchan
|
|
c.pendingMu.Unlock()
|
|
defer func() {
|
|
c.pendingMu.Lock()
|
|
delete(c.pending, id)
|
|
c.pendingMu.Unlock()
|
|
}()
|
|
// now we are ready to send
|
|
n, err := c.stream.Write(ctx, data)
|
|
event.Record(ctx, tag.SentBytes.Of(n))
|
|
if err != nil {
|
|
// sending failed, we will never get a response, so don't leave it pending
|
|
return id, err
|
|
}
|
|
// now wait for the response
|
|
select {
|
|
case response := <-rchan:
|
|
// is it an error response?
|
|
if response.err != nil {
|
|
return id, response.err
|
|
}
|
|
if result == nil || len(response.result) == 0 {
|
|
return id, nil
|
|
}
|
|
if err := json.Unmarshal(response.result, result); err != nil {
|
|
return id, fmt.Errorf("unmarshaling result: %v", err)
|
|
}
|
|
return id, nil
|
|
case <-ctx.Done():
|
|
return id, ctx.Err()
|
|
}
|
|
}
|
|
|
|
func replier(conn *Conn, req Request, spanDone func()) Replier {
|
|
return func(ctx context.Context, result interface{}, err error) error {
|
|
defer func() {
|
|
recordStatus(ctx, err)
|
|
spanDone()
|
|
}()
|
|
call, ok := req.(*Call)
|
|
if !ok {
|
|
// request was a notify, no need to respond
|
|
return nil
|
|
}
|
|
response, err := NewResponse(call.id, result, err)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
data, err := json.Marshal(response)
|
|
n, err := conn.stream.Write(ctx, data)
|
|
event.Record(ctx, tag.SentBytes.Of(n))
|
|
|
|
if err != nil {
|
|
// TODO(iancottrell): if a stream write fails, we really need to shut down
|
|
// the whole stream
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Run blocks until the connection is terminated, and returns any error that
|
|
// caused the termination.
|
|
// It must be called exactly once for each Conn.
|
|
// It returns only when the reader is closed or there is an error in the stream.
|
|
func (c *Conn) Run(runCtx context.Context, handler Handler) error {
|
|
for {
|
|
// get the data for a message
|
|
data, n, err := c.stream.Read(runCtx)
|
|
if err != nil {
|
|
// The stream failed, we cannot continue. If the client disconnected
|
|
// normally, we should get ErrDisconnected here.
|
|
return err
|
|
}
|
|
// read a combined message
|
|
msg, err := DecodeMessage(data)
|
|
if err != nil {
|
|
// a badly formed message arrived, log it and continue
|
|
// we trust the stream to have isolated the error to just this message
|
|
continue
|
|
}
|
|
switch msg := msg.(type) {
|
|
case Request:
|
|
tags := []event.Tag{
|
|
tag.Method.Of(msg.Method()),
|
|
tag.RPCDirection.Of(tag.Inbound),
|
|
{}, // reserved for ID if present
|
|
}
|
|
if call, ok := msg.(*Call); ok {
|
|
tags[len(tags)-1] = tag.RPCID.Of(fmt.Sprintf("%q", call.ID()))
|
|
} else {
|
|
tags = tags[:len(tags)-1]
|
|
}
|
|
reqCtx, spanDone := event.StartSpan(runCtx, msg.Method(), tags...)
|
|
event.Record(reqCtx,
|
|
tag.Started.Of(1),
|
|
tag.ReceivedBytes.Of(n))
|
|
if err := handler(reqCtx, replier(c, msg, spanDone), msg); err != nil {
|
|
// delivery failed, not much we can do
|
|
event.Error(reqCtx, "jsonrpc2 message delivery failed", err)
|
|
}
|
|
case *Response:
|
|
// If method is not set, this should be a response, in which case we must
|
|
// have an id to send the response back to the caller.
|
|
c.pendingMu.Lock()
|
|
rchan, ok := c.pending[msg.id]
|
|
c.pendingMu.Unlock()
|
|
if ok {
|
|
rchan <- msg
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func marshalToRaw(obj interface{}) (json.RawMessage, error) {
|
|
data, err := json.Marshal(obj)
|
|
if err != nil {
|
|
return json.RawMessage{}, err
|
|
}
|
|
return json.RawMessage(data), nil
|
|
}
|
|
|
|
func recordStatus(ctx context.Context, err error) {
|
|
if err != nil {
|
|
event.Label(ctx, tag.StatusCode.Of("ERROR"))
|
|
} else {
|
|
event.Label(ctx, tag.StatusCode.Of("OK"))
|
|
}
|
|
}
|