1
0
mirror of https://github.com/golang/go synced 2024-11-18 09:04:49 -07:00

internal/lsp: refactor LSP server instantiation

Previously, the process of instantiating and running the LSP server was
sharded across the lsp, protocol, and cmd packages, and this resulted in
some APIs that are hard to work with. For example, it's hard to guess
the difference between lsp.NewClientServer, lsp.NewServer,
protocol.NewServer (which returns a client), and protocol.NewClient
(which returns a server).

This change reorganizes Server instantiation as follows:

 + The lsp.Server is now purely an implementation of the protocol.Server
   interface. It is no longer responsible for installing itself into the
   jsonrpc2 Stream, nor for running itself.

 + A new package 'lsprpc' is added, to implement the logic of binding an
   incoming connection to an LSP server session. This is put in a
   separate package for lack of a clear home: it didn't really
   philosophically belong in any of the lsp, cmd, or protocol packages.
   We can perhaps move it to cmd in the future, but I'd like to keep it
   as a separate package while I develop request forwarding.

   simplified import graph:

    jsonrpc2 ⭠ lsprpc ⭠ cmd
               ⭩           ⭦
            lsp           (t.b.d. client tests)
           ⭩   ⭨
     protocol  source

 + The jsonrpc2 package is extended to have a minimal API for running a
   'StreamServer': something analogous to an HTTP server that listens
   for new connections and delegates to a handler (but we couldn't use
   the word 'Handler' for this delegate as it was already taken).

After these changes, I hope that the concerns of "serving the LSP",
"serving jsonrpc2", and "installing the LSP on jsonrpc2" are more
logically organized, though one legitimate criticism is that the word
'Server' is still heavily overloaded.

This change prepares a subsequent change which hijacks the jsonrpc2
connection when forwarding messages to a shared gopls instance.

To test this change, the following improvements are made:

 + A servertest package is added to make it easier to run a test against
   an in-process jsonrpc2 server. For now, this uses TCP but it could
   easily be modified to use io.Pipe.

 + cmd tests are updated to use the servertest package. Unfortunately it
   wasn't yet possible to eliminate the concept of `remote=internal` in
   favor of just using multiple sessions, because view initialization
   involves calling both `go env` and `packages.Load`, which slow down
   session startup significantly. See also golang.org/issue/35968.

   Instead, the syntax for `-remote=internal` is modified to be
   `-remote=internal@127.0.0.1:12345`.

 + An additional test for request cancellation is added for the
   sessionserver package. This test uncovered a bug: when calling
   Canceller.Cancel, we were using id rather than &id, which resulted in
   incorrect json serialization (as only the pointer receiver implements
   the json.Marshaller interface).

Updates golang/go#34111

Change-Id: I75c219df634348cdf53a9e57839b98588311a9ef
Reviewed-on: https://go-review.googlesource.com/c/tools/+/215742
Run-TryBot: Robert Findley <rfindley@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Rebecca Stambler <rstambler@golang.org>
Reviewed-by: Heschi Kreinick <heschi@google.com>
This commit is contained in:
Rob Findley 2020-01-21 19:34:50 -05:00 committed by Robert Findley
parent 37215997d4
commit c29062fe1d
16 changed files with 548 additions and 300 deletions

View File

@ -10,7 +10,10 @@ import (
"golang.org/x/tools/go/packages/packagestest"
"golang.org/x/tools/gopls/internal/hooks"
"golang.org/x/tools/internal/jsonrpc2/servertest"
"golang.org/x/tools/internal/lsp/cache"
cmdtest "golang.org/x/tools/internal/lsp/cmd/test"
"golang.org/x/tools/internal/lsp/lsprpc"
"golang.org/x/tools/internal/lsp/source"
"golang.org/x/tools/internal/lsp/tests"
"golang.org/x/tools/internal/testenv"
@ -37,11 +40,15 @@ func testCommandLine(t *testing.T, exporter packagestest.Exporter) {
t.Skip("testdata directory not present")
}
data := tests.Load(t, exporter, testdata)
ctx := tests.Context(t)
cache := cache.New(commandLineOptions)
ss := lsprpc.NewStreamServer(cache, false)
ts := servertest.NewServer(ctx, ss)
for _, data := range data {
defer data.Exported.Cleanup()
t.Run(data.Folder, func(t *testing.T) {
t.Helper()
tests.Run(t, cmdtest.NewRunner(exporter, data, tests.Context(t), commandLineOptions), data)
tests.Run(t, cmdtest.NewRunner(exporter, data, tests.Context(t), ts.Addr, commandLineOptions), data)
})
}
}

View File

@ -0,0 +1,63 @@
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package jsonrpc2
import (
"context"
"net"
)
// NOTE: This file provides an experimental API for serving multiple remote
// jsonrpc2 clients over the network. For now, it is intentionally similar to
// net/http, but that may change in the future as we figure out the correct
// semantics.
// A StreamServer is used to serve incoming jsonrpc2 clients communicating over
// a newly created stream.
type StreamServer interface {
ServeStream(context.Context, Stream) error
}
// The ServerFunc type is an adapter that implements the StreamServer interface
// using an ordinary function.
type ServerFunc func(context.Context, Stream) error
// ServeStream calls f(ctx, s).
func (f ServerFunc) ServeStream(ctx context.Context, s Stream) error {
return f(ctx, s)
}
// HandlerServer returns a StreamServer that handles incoming streams using the
// provided handler.
func HandlerServer(h Handler) StreamServer {
return ServerFunc(func(ctx context.Context, s Stream) error {
conn := NewConn(s)
conn.AddHandler(h)
return conn.Run(ctx)
})
}
// ListenAndServe starts an jsonrpc2 server on the given address. It exits only
// on error.
func ListenAndServe(ctx context.Context, addr string, server StreamServer) error {
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
return Serve(ctx, ln, server)
}
// Serve accepts incoming connections from the network, and handles them using
// the provided server. It exits only on error.
func Serve(ctx context.Context, ln net.Listener, server StreamServer) error {
for {
netConn, err := ln.Accept()
if err != nil {
return err
}
stream := NewHeaderStream(netConn, netConn)
go server.ServeStream(ctx, stream)
}
}

View File

@ -0,0 +1,54 @@
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package servertest provides utilities for running tests against a remote LSP
// server.
package servertest
import (
"context"
"fmt"
"net"
"golang.org/x/tools/internal/jsonrpc2"
)
// Server is a helper for executing tests against a remote jsonrpc2 connection.
// Once initialized, its Addr field may be used to connect a jsonrpc2 client.
type Server struct {
Addr string
ln net.Listener
}
// NewServer returns a new test server listening on local tcp port and serving
// incoming jsonrpc2 streams using the provided stream server. It panics on any
// error.
func NewServer(ctx context.Context, server jsonrpc2.StreamServer) *Server {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
panic(fmt.Sprintf("servertest: failed to listen: %v", err))
}
go jsonrpc2.Serve(ctx, ln, server)
return &Server{Addr: ln.Addr().String(), ln: ln}
}
// Connect dials the test server and returns a jsonrpc2 Connection that is
// ready for use.
func (s *Server) Connect(ctx context.Context) *jsonrpc2.Conn {
netConn, err := net.Dial("tcp", s.Addr)
if err != nil {
panic(fmt.Sprintf("servertest: failed to connect to test instance: %v", err))
}
conn := jsonrpc2.NewConn(jsonrpc2.NewHeaderStream(netConn, netConn))
go conn.Run(ctx)
return conn
}
// Close is a placeholder for proper test server shutdown.
// TODO: implement proper shutdown, which gracefully closes existing
// connections to the test server.
func (s *Server) Close() error {
return nil
}

View File

@ -0,0 +1,43 @@
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package servertest
import (
"context"
"testing"
"time"
"golang.org/x/tools/internal/jsonrpc2"
)
type fakeHandler struct {
jsonrpc2.EmptyHandler
}
type msg struct {
Msg string
}
func (fakeHandler) Deliver(ctx context.Context, r *jsonrpc2.Request, delivered bool) bool {
if err := r.Reply(ctx, &msg{"pong"}, nil); err != nil {
panic(err)
}
return true
}
func TestTestServer(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
ts := NewServer(ctx, jsonrpc2.HandlerServer(fakeHandler{}))
defer ts.Close()
conn := ts.Connect(ctx)
var got msg
if err := conn.Call(ctx, "ping", &msg{"ping"}, &got); err != nil {
t.Fatal(err)
}
if want := "pong"; got.Msg != want {
t.Errorf("conn.Call(...): returned %q, want %q", got, want)
}
}

View File

@ -40,7 +40,7 @@ func TestCapabilities(t *testing.T) {
params.Capabilities.Workspace.Configuration = true
// Send an initialize request to the server.
ctx, c.Server = lsp.NewClientServer(ctx, cache.New(app.options).NewSession(), c.Client)
c.Server = lsp.NewServer(cache.New(app.options).NewSession(), c.Client)
result, err := c.Server.Initialize(ctx, params)
if err != nil {
t.Fatal(err)

View File

@ -190,47 +190,47 @@ var (
)
func (app *Application) connect(ctx context.Context) (*connection, error) {
switch app.Remote {
case "":
switch {
case app.Remote == "":
connection := newConnection(app)
ctx, connection.Server = lsp.NewClientServer(ctx, cache.New(app.options).NewSession(), connection.Client)
connection.Server = lsp.NewServer(cache.New(app.options).NewSession(), connection.Client)
ctx = protocol.WithClient(ctx, connection.Client)
return connection, connection.initialize(ctx, app.options)
case "internal":
case strings.HasPrefix(app.Remote, "internal@"):
internalMu.Lock()
defer internalMu.Unlock()
if c := internalConnections[app.wd]; c != nil {
return c, nil
}
connection := newConnection(app)
remote := app.Remote[len("internal@"):]
ctx := xcontext.Detach(ctx) //TODO:a way of shutting down the internal server
cr, sw, _ := os.Pipe()
sr, cw, _ := os.Pipe()
var jc *jsonrpc2.Conn
ctx, jc, connection.Server = protocol.NewClient(ctx, jsonrpc2.NewHeaderStream(cr, cw), connection.Client)
go jc.Run(ctx)
go func() {
ctx, srv := lsp.NewServer(ctx, cache.New(app.options).NewSession(), jsonrpc2.NewHeaderStream(sr, sw))
srv.Run(ctx)
}()
if err := connection.initialize(ctx, app.options); err != nil {
connection, err := app.connectRemote(ctx, remote)
if err != nil {
return nil, err
}
internalConnections[app.wd] = connection
return connection, nil
default:
connection := newConnection(app)
conn, err := net.Dial("tcp", app.Remote)
if err != nil {
return nil, err
}
stream := jsonrpc2.NewHeaderStream(conn, conn)
var jc *jsonrpc2.Conn
ctx, jc, connection.Server = protocol.NewClient(ctx, stream, connection.Client)
go jc.Run(ctx)
return connection, connection.initialize(ctx, app.options)
return app.connectRemote(ctx, app.Remote)
}
}
func (app *Application) connectRemote(ctx context.Context, remote string) (*connection, error) {
connection := newConnection(app)
conn, err := net.Dial("tcp", remote)
if err != nil {
return nil, err
}
stream := jsonrpc2.NewHeaderStream(conn, conn)
cc := jsonrpc2.NewConn(stream)
connection.Server = protocol.ServerDispatcher(cc)
cc.AddHandler(protocol.ClientHandler(connection.Client))
cc.AddHandler(protocol.Canceller{})
ctx = protocol.WithClient(ctx, connection.Client)
go cc.Run(ctx)
return connection, connection.initialize(ctx, app.options)
}
func (c *connection) initialize(ctx context.Context, options func(*source.Options)) error {
params := &protocol.ParamInitialize{}
params.RootURI = string(span.FileURI(c.Client.app.wd))
@ -447,7 +447,7 @@ func (c *connection) diagnoseFiles(ctx context.Context, files []span.URI) error
}
func (c *connection) terminate(ctx context.Context) {
if c.Client.app.Remote == "internal" {
if strings.HasPrefix(c.Client.app.Remote, "internal@") {
// internal connections need to be left alive for the next test
return
}

View File

@ -5,6 +5,7 @@
package cmd_test
import (
"context"
"fmt"
"os"
"path/filepath"
@ -13,8 +14,11 @@ import (
"testing"
"golang.org/x/tools/go/packages/packagestest"
"golang.org/x/tools/internal/jsonrpc2/servertest"
"golang.org/x/tools/internal/lsp/cache"
"golang.org/x/tools/internal/lsp/cmd"
cmdtest "golang.org/x/tools/internal/lsp/cmd/test"
"golang.org/x/tools/internal/lsp/lsprpc"
"golang.org/x/tools/internal/lsp/tests"
"golang.org/x/tools/internal/testenv"
)
@ -29,16 +33,24 @@ func TestCommandLine(t *testing.T) {
}
func testCommandLine(t *testing.T, exporter packagestest.Exporter) {
ctx := tests.Context(t)
ts := testServer(ctx)
data := tests.Load(t, exporter, "../testdata")
for _, datum := range data {
defer datum.Exported.Cleanup()
t.Run(datum.Folder, func(t *testing.T) {
t.Helper()
tests.Run(t, cmdtest.NewRunner(exporter, datum, tests.Context(t), nil), datum)
tests.Run(t, cmdtest.NewRunner(exporter, datum, ctx, ts.Addr, nil), datum)
})
}
}
func testServer(ctx context.Context) *servertest.Server {
cache := cache.New(nil)
ss := lsprpc.NewStreamServer(cache, false)
return servertest.NewServer(ctx, ss)
}
func TestDefinitionHelpExample(t *testing.T) {
// TODO: https://golang.org/issue/32794.
t.Skip()
@ -50,6 +62,8 @@ func TestDefinitionHelpExample(t *testing.T) {
t.Errorf("could not get wd: %v", err)
return
}
ctx := tests.Context(t)
ts := testServer(ctx)
thisFile := filepath.Join(dir, "definition.go")
baseArgs := []string{"query", "definition"}
expect := regexp.MustCompile(`(?s)^[\w/\\:_-]+flag[/\\]flag.go:\d+:\d+-\d+: defined here as FlagSet struct {.*}$`)
@ -57,7 +71,7 @@ func TestDefinitionHelpExample(t *testing.T) {
fmt.Sprintf("%v:%v:%v", thisFile, cmd.ExampleLine, cmd.ExampleColumn),
fmt.Sprintf("%v:#%v", thisFile, cmd.ExampleOffset)} {
args := append(baseArgs, query)
r := cmdtest.NewRunner(nil, nil, tests.Context(t), nil)
r := cmdtest.NewRunner(nil, nil, ctx, ts.Addr, nil)
got, _ := r.NormalizeGoplsCmd(t, args...)
if !expect.MatchString(got) {
t.Errorf("test with %v\nexpected:\n%s\ngot:\n%s", args, expect, got)

View File

@ -6,7 +6,6 @@ package cmd
import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
@ -17,12 +16,10 @@ import (
"time"
"golang.org/x/tools/internal/jsonrpc2"
"golang.org/x/tools/internal/lsp"
"golang.org/x/tools/internal/lsp/cache"
"golang.org/x/tools/internal/lsp/debug"
"golang.org/x/tools/internal/lsp/lsprpc"
"golang.org/x/tools/internal/lsp/protocol"
"golang.org/x/tools/internal/lsp/telemetry"
"golang.org/x/tools/internal/telemetry/trace"
"golang.org/x/tools/internal/tool"
errors "golang.org/x/xerrors"
)
@ -82,23 +79,19 @@ func (s *Serve) Run(ctx context.Context, args ...string) error {
return s.forward()
}
prepare := func(ctx context.Context, srv *lsp.Server) *lsp.Server {
srv.Conn.AddHandler(&handler{})
return srv
}
run := func(ctx context.Context, srv *lsp.Server) { go prepare(ctx, srv).Run(ctx) }
ss := lsprpc.NewStreamServer(cache.New(s.app.options), true)
if s.Address != "" {
return lsp.RunServerOnAddress(ctx, cache.New(s.app.options), s.Address, run)
return jsonrpc2.ListenAndServe(ctx, s.Address, ss)
}
if s.Port != 0 {
return lsp.RunServerOnPort(ctx, cache.New(s.app.options), s.Port, run)
addr := fmt.Sprintf(":%v", s.Port)
return jsonrpc2.ListenAndServe(ctx, addr, ss)
}
stream := jsonrpc2.NewHeaderStream(os.Stdin, os.Stdout)
if s.Trace {
stream = protocol.LoggingStream(stream, out)
}
ctx, srv := lsp.NewServer(ctx, cache.New(s.app.options).NewSession(), stream)
return prepare(ctx, srv).Run(ctx)
return ss.ServeStream(ctx, stream)
}
func (s *Serve) forward() error {
@ -134,103 +127,3 @@ func (d debugServe) Port() int { return d.s.Port }
func (d debugServe) Address() string { return d.s.Address }
func (d debugServe) Debug() string { return d.s.Debug }
func (d debugServe) Workdir() string { return d.s.app.wd }
type handler struct{}
type rpcStats struct {
method string
direction jsonrpc2.Direction
id *jsonrpc2.ID
payload *json.RawMessage
start time.Time
delivering func()
close func()
}
type statsKeyType int
const statsKey = statsKeyType(0)
func (h *handler) Deliver(ctx context.Context, r *jsonrpc2.Request, delivered bool) bool {
stats := h.getStats(ctx)
if stats != nil {
stats.delivering()
}
return false
}
func (h *handler) Cancel(ctx context.Context, conn *jsonrpc2.Conn, id jsonrpc2.ID, cancelled bool) bool {
return false
}
func (h *handler) Request(ctx context.Context, conn *jsonrpc2.Conn, direction jsonrpc2.Direction, r *jsonrpc2.WireRequest) context.Context {
if r.Method == "" {
panic("no method in rpc stats")
}
stats := &rpcStats{
method: r.Method,
start: time.Now(),
direction: direction,
payload: r.Params,
}
ctx = context.WithValue(ctx, statsKey, stats)
mode := telemetry.Outbound
if direction == jsonrpc2.Receive {
mode = telemetry.Inbound
}
ctx, stats.close = trace.StartSpan(ctx, r.Method,
telemetry.Method.Of(r.Method),
telemetry.RPCDirection.Of(mode),
telemetry.RPCID.Of(r.ID),
)
telemetry.Started.Record(ctx, 1)
_, stats.delivering = trace.StartSpan(ctx, "queued")
return ctx
}
func (h *handler) Response(ctx context.Context, conn *jsonrpc2.Conn, direction jsonrpc2.Direction, r *jsonrpc2.WireResponse) context.Context {
return ctx
}
func (h *handler) Done(ctx context.Context, err error) {
stats := h.getStats(ctx)
if err != nil {
ctx = telemetry.StatusCode.With(ctx, "ERROR")
} else {
ctx = telemetry.StatusCode.With(ctx, "OK")
}
elapsedTime := time.Since(stats.start)
latencyMillis := float64(elapsedTime) / float64(time.Millisecond)
telemetry.Latency.Record(ctx, latencyMillis)
stats.close()
}
func (h *handler) Read(ctx context.Context, bytes int64) context.Context {
telemetry.SentBytes.Record(ctx, bytes)
return ctx
}
func (h *handler) Wrote(ctx context.Context, bytes int64) context.Context {
telemetry.ReceivedBytes.Record(ctx, bytes)
return ctx
}
const eol = "\r\n\r\n\r\n"
func (h *handler) Error(ctx context.Context, err error) {
}
func (h *handler) getStats(ctx context.Context) *rpcStats {
stats, ok := ctx.Value(statsKey).(*rpcStats)
if !ok || stats == nil {
method, ok := ctx.Value(telemetry.Method).(string)
if !ok {
method = "???"
}
stats = &rpcStats{
method: method,
close: func() {},
}
}
return stats
}

View File

@ -1,86 +0,0 @@
package cmd
import (
"context"
"io"
"regexp"
"testing"
"time"
"golang.org/x/tools/internal/jsonrpc2"
"golang.org/x/tools/internal/lsp/protocol"
"golang.org/x/tools/internal/telemetry/log"
)
type fakeServer struct {
protocol.Server
client protocol.Client
}
func (s *fakeServer) DidOpen(ctx context.Context, params *protocol.DidOpenTextDocumentParams) error {
// Our instrumentation should cause this message to be logged back to the LSP
// client.
log.Print(ctx, "ping")
return nil
}
type fakeClient struct {
protocol.Client
logs chan string
}
func (c *fakeClient) LogMessage(ctx context.Context, params *protocol.LogMessageParams) error {
c.logs <- params.Message
return nil
}
func TestClientLogging(t *testing.T) {
server := &fakeServer{}
client := &fakeClient{logs: make(chan string)}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Bind our fake client and server.
// sReader and sWriter read from and write to the server. cReader and cWriter
// read from and write to the client.
sReader, sWriter := io.Pipe()
cReader, cWriter := io.Pipe()
close := func() {
failOnErr := func(err error) {
if err != nil {
t.Fatal(err)
}
}
failOnErr(sReader.Close())
failOnErr(cReader.Close())
failOnErr(sWriter.Close())
failOnErr(cWriter.Close())
}
defer close()
serverStream := jsonrpc2.NewStream(sReader, cWriter)
// The returned client dispatches to the client, but it is already stored
// in the context by NewServer, so we can ignore it.
serverCtx, serverConn, _ := protocol.NewServer(ctx, serverStream, server)
serverConn.AddHandler(&handler{})
clientStream := jsonrpc2.NewStream(cReader, sWriter)
clientCtx, clientConn, serverDispatch := protocol.NewClient(ctx, clientStream, client)
go clientConn.Run(clientCtx)
go serverConn.Run(serverCtx)
serverDispatch.DidOpen(ctx, &protocol.DidOpenTextDocumentParams{})
select {
case got := <-client.logs:
want := "ping"
matched, err := regexp.MatchString(want, got)
if err != nil {
t.Fatal(err)
}
if !matched {
t.Errorf("got log %q, want a log containing %q", got, want)
}
case <-time.After(1 * time.Second):
t.Error("timeout waiting for client log")
}
}

View File

@ -22,7 +22,7 @@ func (r *runner) Diagnostics(t *testing.T, uri span.URI, want []source.Diagnosti
t.Skip("skipping circular diagnostics tests due to golang/go#36265")
}
fname := uri.Filename()
out, _ := r.RunGoplsCmd(t, "check", fname)
out, _ := r.runGoplsCmd(t, "check", fname)
// parse got into a collection of reports
got := map[string]struct{}{}
for _, l := range strings.Split(out, "\n") {

View File

@ -31,6 +31,7 @@ type runner struct {
ctx context.Context
options func(*source.Options)
normalizers []normalizer
remote string
}
type normalizer struct {
@ -40,13 +41,14 @@ type normalizer struct {
fragment string
}
func NewRunner(exporter packagestest.Exporter, data *tests.Data, ctx context.Context, options func(*source.Options)) *runner {
func NewRunner(exporter packagestest.Exporter, data *tests.Data, ctx context.Context, remote string, options func(*source.Options)) *runner {
r := &runner{
exporter: exporter,
data: data,
ctx: ctx,
options: options,
normalizers: make([]normalizer, 0, len(data.Exported.Modules)),
remote: remote,
}
// build the path normalizing patterns
for _, m := range data.Exported.Modules {
@ -100,7 +102,7 @@ func (r *runner) WorkspaceSymbols(*testing.T, string, []protocol.SymbolInformati
//TODO: add command line workspace symbol tests when it works
}
func (r *runner) RunGoplsCmd(t testing.TB, args ...string) (string, string) {
func (r *runner) runGoplsCmd(t testing.TB, args ...string) (string, string) {
rStdout, wStdout, err := os.Pipe()
if err != nil {
t.Fatal(err)
@ -122,9 +124,10 @@ func (r *runner) RunGoplsCmd(t testing.TB, args ...string) (string, string) {
os.Stdout = wStdout
os.Stderr = wStderr
app := cmd.New("gopls-test", r.data.Config.Dir, r.data.Exported.Config.Env, r.options)
remote := r.remote
err = tool.Run(tests.Context(t),
app,
append([]string{"-remote=internal"}, args...))
append([]string{fmt.Sprintf("-remote=internal@%s", remote)}, args...))
if err != nil {
fmt.Fprint(os.Stderr, err)
}
@ -143,7 +146,7 @@ func (r *runner) RunGoplsCmd(t testing.TB, args ...string) (string, string) {
// NormalizeGoplsCmd runs the gopls command and normalizes its output.
func (r *runner) NormalizeGoplsCmd(t testing.TB, args ...string) (string, string) {
stdout, stderr := r.RunGoplsCmd(t, args...)
stdout, stderr := r.runGoplsCmd(t, args...)
return r.Normalize(stdout), r.Normalize(stderr)
}

View File

@ -0,0 +1,53 @@
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package lsprpc implements a jsonrpc2.StreamServer that may be used to
// serve the LSP on a jsonrpc2 channel.
package lsprpc
import (
"context"
"golang.org/x/tools/internal/jsonrpc2"
"golang.org/x/tools/internal/lsp"
"golang.org/x/tools/internal/lsp/protocol"
"golang.org/x/tools/internal/lsp/source"
)
// The StreamServer type is a jsonrpc2.StreamServer that handles incoming
// streams as a new LSP session, using a shared cache.
type StreamServer struct {
withTelemetry bool
// accept is mutable for testing.
accept func(protocol.Client) protocol.Server
}
// NewStreamServer creates a StreamServer using the shared cache. If
// withTelemetry is true, each session is instrumented with telemetry that
// records RPC statistics.
func NewStreamServer(cache source.Cache, withTelemetry bool) *StreamServer {
s := &StreamServer{
withTelemetry: withTelemetry,
}
s.accept = func(c protocol.Client) protocol.Server {
session := cache.NewSession()
return lsp.NewServer(session, c)
}
return s
}
// ServeStream implements the jsonrpc2.StreamServer interface, by handling
// incoming streams using a new lsp server.
func (s *StreamServer) ServeStream(ctx context.Context, stream jsonrpc2.Stream) error {
conn := jsonrpc2.NewConn(stream)
client := protocol.ClientDispatcher(conn)
server := s.accept(client)
conn.AddHandler(protocol.ServerHandler(server))
conn.AddHandler(protocol.Canceller{})
if s.withTelemetry {
conn.AddHandler(telemetryHandler{})
}
return conn.Run(protocol.WithClient(ctx, client))
}

View File

@ -0,0 +1,119 @@
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package lsprpc
import (
"context"
"regexp"
"testing"
"time"
"golang.org/x/tools/internal/jsonrpc2/servertest"
"golang.org/x/tools/internal/lsp/protocol"
"golang.org/x/tools/internal/telemetry/log"
)
type fakeClient struct {
protocol.Client
logs chan string
}
func (c fakeClient) LogMessage(ctx context.Context, params *protocol.LogMessageParams) error {
c.logs <- params.Message
return nil
}
type pingServer struct{ protocol.Server }
func (s pingServer) DidOpen(ctx context.Context, params *protocol.DidOpenTextDocumentParams) error {
log.Print(ctx, "ping")
return nil
}
func TestClientLogging(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
server := pingServer{}
client := fakeClient{logs: make(chan string, 10)}
ss := &StreamServer{
accept: func(c protocol.Client) protocol.Server {
return server
},
}
ts := servertest.NewServer(ctx, ss)
cc := ts.Connect(ctx)
cc.AddHandler(protocol.ClientHandler(client))
protocol.ServerDispatcher(cc).DidOpen(ctx, &protocol.DidOpenTextDocumentParams{})
select {
case got := <-client.logs:
want := "ping"
matched, err := regexp.MatchString(want, got)
if err != nil {
t.Fatal(err)
}
if !matched {
t.Errorf("got log %q, want a log containing %q", got, want)
}
case <-time.After(1000 * time.Second):
t.Error("timeout waiting for client log")
}
}
type waitableServer struct {
protocol.Server
started chan struct{}
// finished records whether the request ended with a cancellation or not
// (true means the request was cancelled).
finished chan bool
}
func (s waitableServer) CodeLens(ctx context.Context, params *protocol.CodeLensParams) ([]protocol.CodeLens, error) {
s.started <- struct{}{}
cancelled := false
defer func() {
s.finished <- cancelled
}()
select {
case <-ctx.Done():
cancelled = true
return nil, ctx.Err()
case <-time.After(1 * time.Second):
cancelled = false
}
return []protocol.CodeLens{}, nil
}
func TestRequestCancellation(t *testing.T) {
server := waitableServer{
started: make(chan struct{}),
finished: make(chan bool),
}
ss := &StreamServer{
accept: func(c protocol.Client) protocol.Server {
return server
},
}
ctx := context.Background()
ts := servertest.NewServer(ctx, ss)
cc := ts.Connect(ctx)
cc.AddHandler(protocol.Canceller{})
lensCtx, cancelLens := context.WithCancel(context.Background())
go func() {
protocol.ServerDispatcher(cc).CodeLens(lensCtx, &protocol.CodeLensParams{})
}()
<-server.started
cancelLens()
if got, want := <-server.finished, true; got != want {
t.Errorf("CodeLens was cancelled: %t, want %t", got, want)
}
}
// TODO: add a test for telemetry.

View File

@ -0,0 +1,115 @@
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package lsprpc
import (
"context"
"encoding/json"
"time"
"golang.org/x/tools/internal/jsonrpc2"
"golang.org/x/tools/internal/lsp/telemetry"
"golang.org/x/tools/internal/telemetry/trace"
)
type telemetryHandler struct{}
func (h telemetryHandler) Deliver(ctx context.Context, r *jsonrpc2.Request, delivered bool) bool {
stats := h.getStats(ctx)
if stats != nil {
stats.delivering()
}
return false
}
func (h telemetryHandler) Cancel(ctx context.Context, conn *jsonrpc2.Conn, id jsonrpc2.ID, cancelled bool) bool {
return false
}
func (h telemetryHandler) Request(ctx context.Context, conn *jsonrpc2.Conn, direction jsonrpc2.Direction, r *jsonrpc2.WireRequest) context.Context {
if r.Method == "" {
panic("no method in rpc stats")
}
stats := &rpcStats{
method: r.Method,
start: time.Now(),
direction: direction,
payload: r.Params,
}
ctx = context.WithValue(ctx, statsKey, stats)
mode := telemetry.Outbound
if direction == jsonrpc2.Receive {
mode = telemetry.Inbound
}
ctx, stats.close = trace.StartSpan(ctx, r.Method,
telemetry.Method.Of(r.Method),
telemetry.RPCDirection.Of(mode),
telemetry.RPCID.Of(r.ID),
)
telemetry.Started.Record(ctx, 1)
_, stats.delivering = trace.StartSpan(ctx, "queued")
return ctx
}
func (h telemetryHandler) Response(ctx context.Context, conn *jsonrpc2.Conn, direction jsonrpc2.Direction, r *jsonrpc2.WireResponse) context.Context {
return ctx
}
func (h telemetryHandler) Done(ctx context.Context, err error) {
stats := h.getStats(ctx)
if err != nil {
ctx = telemetry.StatusCode.With(ctx, "ERROR")
} else {
ctx = telemetry.StatusCode.With(ctx, "OK")
}
elapsedTime := time.Since(stats.start)
latencyMillis := float64(elapsedTime) / float64(time.Millisecond)
telemetry.Latency.Record(ctx, latencyMillis)
stats.close()
}
func (h telemetryHandler) Read(ctx context.Context, bytes int64) context.Context {
telemetry.SentBytes.Record(ctx, bytes)
return ctx
}
func (h telemetryHandler) Wrote(ctx context.Context, bytes int64) context.Context {
telemetry.ReceivedBytes.Record(ctx, bytes)
return ctx
}
const eol = "\r\n\r\n\r\n"
func (h telemetryHandler) Error(ctx context.Context, err error) {
}
func (h telemetryHandler) getStats(ctx context.Context) *rpcStats {
stats, ok := ctx.Value(statsKey).(*rpcStats)
if !ok || stats == nil {
method, ok := ctx.Value(telemetry.Method).(string)
if !ok {
method = "???"
}
stats = &rpcStats{
method: method,
close: func() {},
}
}
return stats
}
type rpcStats struct {
method string
direction jsonrpc2.Direction
id *jsonrpc2.ID
payload *json.RawMessage
start time.Time
delivering func()
close func()
}
type statsKeyType int
const statsKey = statsKeyType(0)

View File

@ -20,19 +20,44 @@ const (
RequestCancelledError = -32800
)
type canceller struct{ jsonrpc2.EmptyHandler }
type clientHandler struct {
jsonrpc2.EmptyHandler
client Client
}
// ClientHandler returns a jsonrpc2.Handler that handles the LSP client
// protocol.
func ClientHandler(client Client) jsonrpc2.Handler {
return &clientHandler{client: client}
}
type serverHandler struct {
jsonrpc2.EmptyHandler
server Server
}
func (canceller) Request(ctx context.Context, conn *jsonrpc2.Conn, direction jsonrpc2.Direction, r *jsonrpc2.WireRequest) context.Context {
// ServerHandler returns a jsonrpc2.Handler that handles the LSP server
// protocol.
func ServerHandler(server Server) jsonrpc2.Handler {
return &serverHandler{server: server}
}
// ClientDispatcher returns a Client that dispatches LSP requests across the
// given jsonrpc2 connection.
func ClientDispatcher(conn *jsonrpc2.Conn) Client {
return &clientDispatcher{Conn: conn}
}
// ServerDispatcher returns a Server that dispatches LSP requests across the
// given jsonrpc2 connection.
func ServerDispatcher(conn *jsonrpc2.Conn) Server {
return &serverDispatcher{Conn: conn}
}
// Canceller is a jsonrpc2.Handler that handles LSP request cancellation.
type Canceller struct{ jsonrpc2.EmptyHandler }
func (Canceller) Request(ctx context.Context, conn *jsonrpc2.Conn, direction jsonrpc2.Direction, r *jsonrpc2.WireRequest) context.Context {
if direction == jsonrpc2.Receive && r.Method == "$/cancelRequest" {
var params CancelParams
if err := json.Unmarshal(*r.Params, &params); err != nil {
@ -53,39 +78,23 @@ func (canceller) Request(ctx context.Context, conn *jsonrpc2.Conn, direction jso
return ctx
}
func (canceller) Cancel(ctx context.Context, conn *jsonrpc2.Conn, id jsonrpc2.ID, cancelled bool) bool {
func (Canceller) Cancel(ctx context.Context, conn *jsonrpc2.Conn, id jsonrpc2.ID, cancelled bool) bool {
if cancelled {
return false
}
ctx = xcontext.Detach(ctx)
ctx, done := trace.StartSpan(ctx, "protocol.canceller")
defer done()
conn.Notify(ctx, "$/cancelRequest", &CancelParams{ID: id})
// Note that only *jsonrpc2.ID implements json.Marshaler.
conn.Notify(ctx, "$/cancelRequest", &CancelParams{ID: &id})
return true
}
func (canceller) Deliver(ctx context.Context, r *jsonrpc2.Request, delivered bool) bool {
func (Canceller) Deliver(ctx context.Context, r *jsonrpc2.Request, delivered bool) bool {
// Hide cancellations from downstream handlers.
return r.Method == "$/cancelRequest"
}
func NewClient(ctx context.Context, stream jsonrpc2.Stream, client Client) (context.Context, *jsonrpc2.Conn, Server) {
ctx = WithClient(ctx, client)
conn := jsonrpc2.NewConn(stream)
conn.AddHandler(&clientHandler{client: client})
conn.AddHandler(&canceller{})
return ctx, conn, &serverDispatcher{Conn: conn}
}
func NewServer(ctx context.Context, stream jsonrpc2.Stream, server Server) (context.Context, *jsonrpc2.Conn, Client) {
conn := jsonrpc2.NewConn(stream)
client := &clientDispatcher{Conn: conn}
ctx = WithClient(ctx, client)
conn.AddHandler(&serverHandler{server: server})
conn.AddHandler(&canceller{})
return ctx, conn, client
}
func sendParseError(ctx context.Context, req *jsonrpc2.Request, err error) {
if _, ok := err.(*jsonrpc2.Error); !ok {
err = jsonrpc2.NewErrorf(jsonrpc2.CodeParseError, "%v", err)

View File

@ -7,8 +7,6 @@ package lsp
import (
"context"
"fmt"
"net"
"sync"
"golang.org/x/tools/internal/jsonrpc2"
@ -17,51 +15,14 @@ import (
"golang.org/x/tools/internal/span"
)
// NewClientServer
func NewClientServer(ctx context.Context, session source.Session, client protocol.Client) (context.Context, *Server) {
ctx = protocol.WithClient(ctx, client)
return ctx, &Server{
client: client,
session: session,
delivered: make(map[span.URI]sentDiagnostics),
}
}
// NewServer creates an LSP server and binds it to handle incoming client
// messages on on the supplied stream.
func NewServer(ctx context.Context, session source.Session, stream jsonrpc2.Stream) (context.Context, *Server) {
s := &Server{
func NewServer(session source.Session, client protocol.Client) *Server {
return &Server{
delivered: make(map[span.URI]sentDiagnostics),
session: session,
client: client,
}
ctx, s.Conn, s.client = protocol.NewServer(ctx, stream, s)
return ctx, s
}
// RunServerOnPort starts an LSP server on the given port and does not exit.
// This function exists for debugging purposes.
func RunServerOnPort(ctx context.Context, cache source.Cache, port int, h func(ctx context.Context, s *Server)) error {
return RunServerOnAddress(ctx, cache, fmt.Sprintf(":%v", port), h)
}
// RunServerOnAddress starts an LSP server on the given address and does not
// exit. This function exists for debugging purposes.
func RunServerOnAddress(ctx context.Context, cache source.Cache, addr string, h func(ctx context.Context, s *Server)) error {
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
for {
conn, err := ln.Accept()
if err != nil {
return err
}
h(NewServer(ctx, cache.NewSession(), jsonrpc2.NewHeaderStream(conn, conn)))
}
}
func (s *Server) Run(ctx context.Context) error {
return s.Conn.Run(ctx)
}
type serverState int
@ -73,8 +34,8 @@ const (
serverShutDown
)
// Server implements the protocol.Server interface.
type Server struct {
Conn *jsonrpc2.Conn
client protocol.Client
stateMu sync.Mutex