mirror of
https://github.com/golang/go
synced 2024-11-18 04:14:49 -07:00
internal/jsonrpc2: change jsonrpc2.Conn to be an interface
This will allow varying implementations and wrappers, and more closely matches the concepts used in the net library. Change-Id: I4be4c6efb3def0eda2693f482cbb0c6f776e5642 Reviewed-on: https://go-review.googlesource.com/c/tools/+/232877 Run-TryBot: Ian Cottrell <iancottrell@google.com> TryBot-Result: Gobot Gobot <gobot@golang.org> Reviewed-by: Robert Findley <rfindley@google.com>
This commit is contained in:
parent
1c0c6cf43e
commit
d3396bb197
262
internal/jsonrpc2/conn.go
Normal file
262
internal/jsonrpc2/conn.go
Normal file
@ -0,0 +1,262 @@
|
||||
// Copyright 2018 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package jsonrpc2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"golang.org/x/tools/internal/event"
|
||||
"golang.org/x/tools/internal/event/label"
|
||||
"golang.org/x/tools/internal/lsp/debug/tag"
|
||||
)
|
||||
|
||||
// Conn is the common interface to jsonrpc clients and servers.
|
||||
// Conn is bidirectional; it does not have a designated server or client end.
|
||||
// It manages the jsonrpc2 protocol, connecting responses back to their calls.
|
||||
type Conn interface {
|
||||
// Call invokes the target method and waits for a response.
|
||||
// The params will be marshaled to JSON before sending over the wire, and will
|
||||
// be handed to the method invoked.
|
||||
// The response will be unmarshaled from JSON into the result.
|
||||
// The id returned will be unique from this connection, and can be used for
|
||||
// logging or tracking.
|
||||
Call(ctx context.Context, method string, params, result interface{}) (ID, error)
|
||||
|
||||
// Notify invokes the target method but does not wait for a response.
|
||||
// The params will be marshaled to JSON before sending over the wire, and will
|
||||
// be handed to the method invoked.
|
||||
Notify(ctx context.Context, method string, params interface{}) error
|
||||
|
||||
// Go starts a goroutine to handle the connection.
|
||||
// It must be called exactly once for each Conn.
|
||||
// It returns immediately.
|
||||
// You must block on Done() to wait for the connection to shut down.
|
||||
// This is a temporary measure, this should be started automatically in the
|
||||
// future.
|
||||
Go(ctx context.Context, handler Handler)
|
||||
|
||||
// Close closes the connection and it's underlying stream.
|
||||
// It does not wait for the close to complete, use the Done() channel for
|
||||
// that.
|
||||
Close() error
|
||||
|
||||
// Done returns a channel that will be closed when the processing goroutine
|
||||
// has terminated, which will happen if Close() is called or an underlying
|
||||
// stream is closed.
|
||||
Done() <-chan struct{}
|
||||
|
||||
// Err returns an error if there was one from within the processing goroutine.
|
||||
// If err returns non nil, the connection will be already closed or closing.
|
||||
Err() error
|
||||
}
|
||||
|
||||
type conn struct {
|
||||
seq int64 // must only be accessed using atomic operations
|
||||
writeMu sync.Mutex // protects writes to the stream
|
||||
stream Stream
|
||||
pendingMu sync.Mutex // protects the pending map
|
||||
pending map[ID]chan *Response
|
||||
|
||||
done chan struct{}
|
||||
err atomic.Value
|
||||
}
|
||||
|
||||
// NewConn creates a new connection object around the supplied stream.
|
||||
func NewConn(s Stream) Conn {
|
||||
conn := &conn{
|
||||
stream: s,
|
||||
pending: make(map[ID]chan *Response),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
return conn
|
||||
}
|
||||
|
||||
func (c *conn) Notify(ctx context.Context, method string, params interface{}) (err error) {
|
||||
notify, err := NewNotification(method, params)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshaling notify parameters: %v", err)
|
||||
}
|
||||
ctx, done := event.Start(ctx, method,
|
||||
tag.Method.Of(method),
|
||||
tag.RPCDirection.Of(tag.Outbound),
|
||||
)
|
||||
defer func() {
|
||||
recordStatus(ctx, err)
|
||||
done()
|
||||
}()
|
||||
|
||||
event.Metric(ctx, tag.Started.Of(1))
|
||||
n, err := c.write(ctx, notify)
|
||||
event.Metric(ctx, tag.SentBytes.Of(n))
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *conn) Call(ctx context.Context, method string, params, result interface{}) (_ ID, err error) {
|
||||
// generate a new request identifier
|
||||
id := ID{number: atomic.AddInt64(&c.seq, 1)}
|
||||
call, err := NewCall(id, method, params)
|
||||
if err != nil {
|
||||
return id, fmt.Errorf("marshaling call parameters: %v", err)
|
||||
}
|
||||
ctx, done := event.Start(ctx, method,
|
||||
tag.Method.Of(method),
|
||||
tag.RPCDirection.Of(tag.Outbound),
|
||||
tag.RPCID.Of(fmt.Sprintf("%q", id)),
|
||||
)
|
||||
defer func() {
|
||||
recordStatus(ctx, err)
|
||||
done()
|
||||
}()
|
||||
event.Metric(ctx, tag.Started.Of(1))
|
||||
// We have to add ourselves to the pending map before we send, otherwise we
|
||||
// are racing the response. Also add a buffer to rchan, so that if we get a
|
||||
// wire response between the time this call is cancelled and id is deleted
|
||||
// from c.pending, the send to rchan will not block.
|
||||
rchan := make(chan *Response, 1)
|
||||
c.pendingMu.Lock()
|
||||
c.pending[id] = rchan
|
||||
c.pendingMu.Unlock()
|
||||
defer func() {
|
||||
c.pendingMu.Lock()
|
||||
delete(c.pending, id)
|
||||
c.pendingMu.Unlock()
|
||||
}()
|
||||
// now we are ready to send
|
||||
n, err := c.write(ctx, call)
|
||||
event.Metric(ctx, tag.SentBytes.Of(n))
|
||||
if err != nil {
|
||||
// sending failed, we will never get a response, so don't leave it pending
|
||||
return id, err
|
||||
}
|
||||
// now wait for the response
|
||||
select {
|
||||
case response := <-rchan:
|
||||
// is it an error response?
|
||||
if response.err != nil {
|
||||
return id, response.err
|
||||
}
|
||||
if result == nil || len(response.result) == 0 {
|
||||
return id, nil
|
||||
}
|
||||
if err := json.Unmarshal(response.result, result); err != nil {
|
||||
return id, fmt.Errorf("unmarshaling result: %v", err)
|
||||
}
|
||||
return id, nil
|
||||
case <-ctx.Done():
|
||||
return id, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *conn) replier(req Request, spanDone func()) Replier {
|
||||
return func(ctx context.Context, result interface{}, err error) error {
|
||||
defer func() {
|
||||
recordStatus(ctx, err)
|
||||
spanDone()
|
||||
}()
|
||||
call, ok := req.(*Call)
|
||||
if !ok {
|
||||
// request was a notify, no need to respond
|
||||
return nil
|
||||
}
|
||||
response, err := NewResponse(call.id, result, err)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n, err := c.write(ctx, response)
|
||||
event.Metric(ctx, tag.SentBytes.Of(n))
|
||||
if err != nil {
|
||||
// TODO(iancottrell): if a stream write fails, we really need to shut down
|
||||
// the whole stream
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (c *conn) write(ctx context.Context, msg Message) (int64, error) {
|
||||
c.writeMu.Lock()
|
||||
defer c.writeMu.Unlock()
|
||||
return c.stream.Write(ctx, msg)
|
||||
}
|
||||
|
||||
func (c *conn) Go(ctx context.Context, handler Handler) {
|
||||
go c.run(ctx, handler)
|
||||
}
|
||||
|
||||
func (c *conn) run(ctx context.Context, handler Handler) {
|
||||
defer close(c.done)
|
||||
for {
|
||||
// get the next message
|
||||
msg, n, err := c.stream.Read(ctx)
|
||||
if err != nil {
|
||||
// The stream failed, we cannot continue.
|
||||
c.fail(err)
|
||||
return
|
||||
}
|
||||
switch msg := msg.(type) {
|
||||
case Request:
|
||||
labels := []label.Label{
|
||||
tag.Method.Of(msg.Method()),
|
||||
tag.RPCDirection.Of(tag.Inbound),
|
||||
{}, // reserved for ID if present
|
||||
}
|
||||
if call, ok := msg.(*Call); ok {
|
||||
labels[len(labels)-1] = tag.RPCID.Of(fmt.Sprintf("%q", call.ID()))
|
||||
} else {
|
||||
labels = labels[:len(labels)-1]
|
||||
}
|
||||
reqCtx, spanDone := event.Start(ctx, msg.Method(), labels...)
|
||||
event.Metric(reqCtx,
|
||||
tag.Started.Of(1),
|
||||
tag.ReceivedBytes.Of(n))
|
||||
if err := handler(reqCtx, c.replier(msg, spanDone), msg); err != nil {
|
||||
// delivery failed, not much we can do
|
||||
event.Error(reqCtx, "jsonrpc2 message delivery failed", err)
|
||||
}
|
||||
case *Response:
|
||||
// If method is not set, this should be a response, in which case we must
|
||||
// have an id to send the response back to the caller.
|
||||
c.pendingMu.Lock()
|
||||
rchan, ok := c.pending[msg.id]
|
||||
c.pendingMu.Unlock()
|
||||
if ok {
|
||||
rchan <- msg
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *conn) Close() error {
|
||||
return c.stream.Close()
|
||||
}
|
||||
|
||||
func (c *conn) Done() <-chan struct{} {
|
||||
return c.done
|
||||
}
|
||||
|
||||
func (c *conn) Err() error {
|
||||
if err := c.err.Load(); err != nil {
|
||||
return err.(error)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// fail sets a failure condition on the stream and closes it.
|
||||
func (c *conn) fail(err error) {
|
||||
c.err.Store(err)
|
||||
c.stream.Close()
|
||||
}
|
||||
|
||||
func recordStatus(ctx context.Context, err error) {
|
||||
if err != nil {
|
||||
event.Label(ctx, tag.StatusCode.Of("ERROR"))
|
||||
} else {
|
||||
event.Label(ctx, tag.StatusCode.Of("OK"))
|
||||
}
|
||||
}
|
@ -7,259 +7,11 @@
|
||||
// It is intended to be compatible with other implementations at the wire level.
|
||||
package jsonrpc2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"golang.org/x/tools/internal/event"
|
||||
"golang.org/x/tools/internal/event/label"
|
||||
"golang.org/x/tools/internal/lsp/debug/tag"
|
||||
)
|
||||
|
||||
const (
|
||||
// ErrIdleTimeout is returned when serving timed out waiting for new connections.
|
||||
ErrIdleTimeout = constError("timed out waiting for new connections")
|
||||
)
|
||||
|
||||
// Conn is a JSON RPC 2 client server connection.
|
||||
// Conn is bidirectional; it does not have a designated server or client end.
|
||||
type Conn struct {
|
||||
seq int64 // must only be accessed using atomic operations
|
||||
writeMu sync.Mutex // protects writes to the stream
|
||||
stream Stream
|
||||
pendingMu sync.Mutex // protects the pending map
|
||||
pending map[ID]chan *Response
|
||||
|
||||
done chan struct{}
|
||||
err atomic.Value
|
||||
}
|
||||
|
||||
type constError string
|
||||
|
||||
func (e constError) Error() string { return string(e) }
|
||||
|
||||
// NewConn creates a new connection object around the supplied stream.
|
||||
// You must call Run for the connection to be active.
|
||||
func NewConn(s Stream) *Conn {
|
||||
conn := &Conn{
|
||||
stream: s,
|
||||
pending: make(map[ID]chan *Response),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
return conn
|
||||
}
|
||||
|
||||
// Notify is called to send a notification request over the connection.
|
||||
// It will return as soon as the notification has been sent, as no response is
|
||||
// possible.
|
||||
func (c *Conn) Notify(ctx context.Context, method string, params interface{}) (err error) {
|
||||
notify, err := NewNotification(method, params)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshaling notify parameters: %v", err)
|
||||
}
|
||||
ctx, done := event.Start(ctx, method,
|
||||
tag.Method.Of(method),
|
||||
tag.RPCDirection.Of(tag.Outbound),
|
||||
)
|
||||
defer func() {
|
||||
recordStatus(ctx, err)
|
||||
done()
|
||||
}()
|
||||
|
||||
event.Metric(ctx, tag.Started.Of(1))
|
||||
n, err := c.write(ctx, notify)
|
||||
event.Metric(ctx, tag.SentBytes.Of(n))
|
||||
return err
|
||||
}
|
||||
|
||||
// Call sends a request over the connection and then waits for a response.
|
||||
// If the response is not an error, it will be decoded into result.
|
||||
// result must be of a type you an pass to json.Unmarshal.
|
||||
func (c *Conn) Call(ctx context.Context, method string, params, result interface{}) (_ ID, err error) {
|
||||
// generate a new request identifier
|
||||
id := ID{number: atomic.AddInt64(&c.seq, 1)}
|
||||
call, err := NewCall(id, method, params)
|
||||
if err != nil {
|
||||
return id, fmt.Errorf("marshaling call parameters: %v", err)
|
||||
}
|
||||
ctx, done := event.Start(ctx, method,
|
||||
tag.Method.Of(method),
|
||||
tag.RPCDirection.Of(tag.Outbound),
|
||||
tag.RPCID.Of(fmt.Sprintf("%q", id)),
|
||||
)
|
||||
defer func() {
|
||||
recordStatus(ctx, err)
|
||||
done()
|
||||
}()
|
||||
event.Metric(ctx, tag.Started.Of(1))
|
||||
// We have to add ourselves to the pending map before we send, otherwise we
|
||||
// are racing the response. Also add a buffer to rchan, so that if we get a
|
||||
// wire response between the time this call is cancelled and id is deleted
|
||||
// from c.pending, the send to rchan will not block.
|
||||
rchan := make(chan *Response, 1)
|
||||
c.pendingMu.Lock()
|
||||
c.pending[id] = rchan
|
||||
c.pendingMu.Unlock()
|
||||
defer func() {
|
||||
c.pendingMu.Lock()
|
||||
delete(c.pending, id)
|
||||
c.pendingMu.Unlock()
|
||||
}()
|
||||
// now we are ready to send
|
||||
n, err := c.write(ctx, call)
|
||||
event.Metric(ctx, tag.SentBytes.Of(n))
|
||||
if err != nil {
|
||||
// sending failed, we will never get a response, so don't leave it pending
|
||||
return id, err
|
||||
}
|
||||
// now wait for the response
|
||||
select {
|
||||
case response := <-rchan:
|
||||
// is it an error response?
|
||||
if response.err != nil {
|
||||
return id, response.err
|
||||
}
|
||||
if result == nil || len(response.result) == 0 {
|
||||
return id, nil
|
||||
}
|
||||
if err := json.Unmarshal(response.result, result); err != nil {
|
||||
return id, fmt.Errorf("unmarshaling result: %v", err)
|
||||
}
|
||||
return id, nil
|
||||
case <-ctx.Done():
|
||||
return id, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func replier(conn *Conn, req Request, spanDone func()) Replier {
|
||||
return func(ctx context.Context, result interface{}, err error) error {
|
||||
defer func() {
|
||||
recordStatus(ctx, err)
|
||||
spanDone()
|
||||
}()
|
||||
call, ok := req.(*Call)
|
||||
if !ok {
|
||||
// request was a notify, no need to respond
|
||||
return nil
|
||||
}
|
||||
response, err := NewResponse(call.id, result, err)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n, err := conn.write(ctx, response)
|
||||
event.Metric(ctx, tag.SentBytes.Of(n))
|
||||
if err != nil {
|
||||
// TODO(iancottrell): if a stream write fails, we really need to shut down
|
||||
// the whole stream
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Conn) write(ctx context.Context, msg Message) (int64, error) {
|
||||
c.writeMu.Lock()
|
||||
defer c.writeMu.Unlock()
|
||||
return c.stream.Write(ctx, msg)
|
||||
}
|
||||
|
||||
// Go starts a goroutine to handle the connection.
|
||||
// It must be called exactly once for each Conn.
|
||||
// It returns immediately.
|
||||
// You must block on Done() to wait for the connection to shut down.
|
||||
// This is a temporary measure, this should be started automatically in the
|
||||
// future.
|
||||
func (c *Conn) Go(ctx context.Context, handler Handler) {
|
||||
go c.run(ctx, handler)
|
||||
}
|
||||
|
||||
func (c *Conn) run(ctx context.Context, handler Handler) {
|
||||
defer close(c.done)
|
||||
for {
|
||||
// get the next message
|
||||
msg, n, err := c.stream.Read(ctx)
|
||||
if err != nil {
|
||||
// The stream failed, we cannot continue.
|
||||
c.fail(err)
|
||||
return
|
||||
}
|
||||
switch msg := msg.(type) {
|
||||
case Request:
|
||||
labels := []label.Label{
|
||||
tag.Method.Of(msg.Method()),
|
||||
tag.RPCDirection.Of(tag.Inbound),
|
||||
{}, // reserved for ID if present
|
||||
}
|
||||
if call, ok := msg.(*Call); ok {
|
||||
labels[len(labels)-1] = tag.RPCID.Of(fmt.Sprintf("%q", call.ID()))
|
||||
} else {
|
||||
labels = labels[:len(labels)-1]
|
||||
}
|
||||
reqCtx, spanDone := event.Start(ctx, msg.Method(), labels...)
|
||||
event.Metric(reqCtx,
|
||||
tag.Started.Of(1),
|
||||
tag.ReceivedBytes.Of(n))
|
||||
if err := handler(reqCtx, replier(c, msg, spanDone), msg); err != nil {
|
||||
// delivery failed, not much we can do
|
||||
event.Error(reqCtx, "jsonrpc2 message delivery failed", err)
|
||||
}
|
||||
case *Response:
|
||||
// If method is not set, this should be a response, in which case we must
|
||||
// have an id to send the response back to the caller.
|
||||
c.pendingMu.Lock()
|
||||
rchan, ok := c.pending[msg.id]
|
||||
c.pendingMu.Unlock()
|
||||
if ok {
|
||||
rchan <- msg
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes the underlying stream.
|
||||
// This does not wait for the underlying handler to finish, block on the done
|
||||
// channel with <-Done() for that purpose.
|
||||
func (c *Conn) Close() error {
|
||||
return c.stream.Close()
|
||||
}
|
||||
|
||||
// Done returns a channel that will be closed when the processing goroutine has
|
||||
// terminated, which will happen if Close() is called or the underlying
|
||||
// stream is closed.
|
||||
func (c *Conn) Done() <-chan struct{} {
|
||||
return c.done
|
||||
}
|
||||
|
||||
// Err returns an error if there was one from within the processing goroutine.
|
||||
// If err returns non nil, the connection will be already closed or closing.
|
||||
func (c *Conn) Err() error {
|
||||
if err := c.err.Load(); err != nil {
|
||||
return err.(error)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// fail sets a failure condition on the stream and closes it.
|
||||
func (c *Conn) fail(err error) {
|
||||
c.err.Store(err)
|
||||
c.stream.Close()
|
||||
}
|
||||
|
||||
func marshalToRaw(obj interface{}) (json.RawMessage, error) {
|
||||
data, err := json.Marshal(obj)
|
||||
if err != nil {
|
||||
return json.RawMessage{}, err
|
||||
}
|
||||
return json.RawMessage(data), nil
|
||||
}
|
||||
|
||||
func recordStatus(ctx context.Context, err error) {
|
||||
if err != nil {
|
||||
event.Label(ctx, tag.StatusCode.Of("ERROR"))
|
||||
} else {
|
||||
event.Label(ctx, tag.StatusCode.Of("OK"))
|
||||
}
|
||||
}
|
||||
|
@ -90,7 +90,7 @@ func TestCall(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func prepare(ctx context.Context, t *testing.T, withHeaders bool) (*jsonrpc2.Conn, *jsonrpc2.Conn, func()) {
|
||||
func prepare(ctx context.Context, t *testing.T, withHeaders bool) (jsonrpc2.Conn, jsonrpc2.Conn, func()) {
|
||||
// make a wait group that can be used to wait for the system to shut down
|
||||
aPipe, bPipe := net.Pipe()
|
||||
a := run(ctx, withHeaders, aPipe)
|
||||
@ -103,7 +103,7 @@ func prepare(ctx context.Context, t *testing.T, withHeaders bool) (*jsonrpc2.Con
|
||||
}
|
||||
}
|
||||
|
||||
func run(ctx context.Context, withHeaders bool, nc net.Conn) *jsonrpc2.Conn {
|
||||
func run(ctx context.Context, withHeaders bool, nc net.Conn) jsonrpc2.Conn {
|
||||
var stream jsonrpc2.Stream
|
||||
if withHeaders {
|
||||
stream = jsonrpc2.NewHeaderStream(nc)
|
||||
|
@ -225,3 +225,11 @@ func DecodeMessage(data []byte) (Message, error) {
|
||||
}
|
||||
return call, nil
|
||||
}
|
||||
|
||||
func marshalToRaw(obj interface{}) (json.RawMessage, error) {
|
||||
data, err := json.Marshal(obj)
|
||||
if err != nil {
|
||||
return json.RawMessage{}, err
|
||||
}
|
||||
return json.RawMessage(data), nil
|
||||
}
|
||||
|
@ -17,7 +17,7 @@ import (
|
||||
|
||||
// Connector is the interface used to connect to a server.
|
||||
type Connector interface {
|
||||
Connect(context.Context) *jsonrpc2.Conn
|
||||
Connect(context.Context) jsonrpc2.Conn
|
||||
}
|
||||
|
||||
// TCPServer is a helper for executing tests against a remote jsonrpc2
|
||||
@ -48,7 +48,7 @@ func NewTCPServer(ctx context.Context, server jsonrpc2.StreamServer, framer json
|
||||
|
||||
// Connect dials the test server and returns a jsonrpc2 Connection that is
|
||||
// ready for use.
|
||||
func (s *TCPServer) Connect(ctx context.Context) *jsonrpc2.Conn {
|
||||
func (s *TCPServer) Connect(ctx context.Context) jsonrpc2.Conn {
|
||||
netConn, err := net.Dial("tcp", s.Addr)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("servertest: failed to connect to test instance: %v", err))
|
||||
@ -81,7 +81,7 @@ func NewPipeServer(ctx context.Context, server jsonrpc2.StreamServer, framer jso
|
||||
}
|
||||
|
||||
// Connect creates new io.Pipes and binds them to the underlying StreamServer.
|
||||
func (s *PipeServer) Connect(ctx context.Context) *jsonrpc2.Conn {
|
||||
func (s *PipeServer) Connect(ctx context.Context) jsonrpc2.Conn {
|
||||
sPipe, cPipe := net.Pipe()
|
||||
s.cls.add(func() {
|
||||
sPipe.Close()
|
||||
|
@ -80,7 +80,7 @@ func NewEditor(ws *Sandbox, config EditorConfig) *Editor {
|
||||
//
|
||||
// It returns the editor, so that it may be called as follows:
|
||||
// editor, err := NewEditor(s).Connect(ctx, conn)
|
||||
func (e *Editor) Connect(ctx context.Context, conn *jsonrpc2.Conn, hooks ClientHooks) (*Editor, error) {
|
||||
func (e *Editor) Connect(ctx context.Context, conn jsonrpc2.Conn, hooks ClientHooks) (*Editor, error) {
|
||||
e.Server = protocol.ServerDispatcher(conn)
|
||||
e.client = &Client{editor: e, hooks: hooks}
|
||||
conn.Go(ctx,
|
||||
|
@ -21,13 +21,13 @@ var (
|
||||
|
||||
// ClientDispatcher returns a Client that dispatches LSP requests across the
|
||||
// given jsonrpc2 connection.
|
||||
func ClientDispatcher(conn *jsonrpc2.Conn) Client {
|
||||
func ClientDispatcher(conn jsonrpc2.Conn) Client {
|
||||
return &clientDispatcher{Conn: conn}
|
||||
}
|
||||
|
||||
// ServerDispatcher returns a Server that dispatches LSP requests across the
|
||||
// given jsonrpc2 connection.
|
||||
func ServerDispatcher(conn *jsonrpc2.Conn) Server {
|
||||
func ServerDispatcher(conn jsonrpc2.Conn) Server {
|
||||
return &serverDispatcher{Conn: conn}
|
||||
}
|
||||
|
||||
@ -72,7 +72,7 @@ func CancelHandler(handler jsonrpc2.Handler) jsonrpc2.Handler {
|
||||
}
|
||||
}
|
||||
|
||||
func Call(ctx context.Context, conn *jsonrpc2.Conn, method string, params interface{}, result interface{}) error {
|
||||
func Call(ctx context.Context, conn jsonrpc2.Conn, method string, params interface{}, result interface{}) error {
|
||||
id, err := conn.Call(ctx, method, params, result)
|
||||
if ctx.Err() != nil {
|
||||
cancelCall(ctx, conn, id)
|
||||
@ -80,7 +80,7 @@ func Call(ctx context.Context, conn *jsonrpc2.Conn, method string, params interf
|
||||
return err
|
||||
}
|
||||
|
||||
func cancelCall(ctx context.Context, conn *jsonrpc2.Conn, id jsonrpc2.ID) {
|
||||
func cancelCall(ctx context.Context, conn jsonrpc2.Conn, id jsonrpc2.ID) {
|
||||
ctx = xcontext.Detach(ctx)
|
||||
ctx, done := event.Start(ctx, "protocol.canceller")
|
||||
defer done()
|
||||
|
@ -129,7 +129,7 @@ func ClientHandler(client Client, handler jsonrpc2.Handler) jsonrpc2.Handler {
|
||||
}
|
||||
|
||||
type clientDispatcher struct {
|
||||
*jsonrpc2.Conn
|
||||
jsonrpc2.Conn
|
||||
}
|
||||
|
||||
func (s *clientDispatcher) ShowMessage(ctx context.Context, params *ShowMessageParams) error {
|
||||
|
@ -427,7 +427,7 @@ func ServerHandler(server Server, handler jsonrpc2.Handler) jsonrpc2.Handler {
|
||||
}
|
||||
|
||||
type serverDispatcher struct {
|
||||
*jsonrpc2.Conn
|
||||
jsonrpc2.Conn
|
||||
}
|
||||
|
||||
func (s *serverDispatcher) DidChangeWorkspaceFolders(ctx context.Context, params *DidChangeWorkspaceFoldersParams) error {
|
||||
|
@ -1096,7 +1096,7 @@ function output(side: side) {
|
||||
}`);
|
||||
f(`
|
||||
type ${side.name}Dispatcher struct {
|
||||
*jsonrpc2.Conn
|
||||
jsonrpc2.Conn
|
||||
}
|
||||
`);
|
||||
side.calls.forEach((v) => {f(v)});
|
||||
|
@ -31,7 +31,7 @@ type Env struct {
|
||||
Sandbox *fake.Sandbox
|
||||
Editor *fake.Editor
|
||||
Server servertest.Connector
|
||||
Conn *jsonrpc2.Conn
|
||||
Conn jsonrpc2.Conn
|
||||
|
||||
// mu guards the fields below, for the purpose of checking conditions on
|
||||
// every change to diagnostics.
|
||||
|
Loading…
Reference in New Issue
Block a user