1
0
mirror of https://github.com/golang/go synced 2024-09-30 20:28:32 -06:00

internal/jsonrpc2: support serving over unix domain sockets

For tests (and perhaps later, for daemon discovery), unix domain sockets
offer advantages over TCP: we can know the exact socket address that will be
used when starting a server subprocess. They also offer performance and
security advantages over TCP, and were specifically requested on
golang.org/issues/34111.

This CL adds support for listening on UDS, and uses this to implement an
additional regtest environment mode that starts up an external process.
This mode is disabled by default, but may be enabled by the
-enable_gopls_subprocess_tests.

The regtest TestMain may be hijacked to instead run as gopls, if a
special environment variable is set. This allows the the test runner to
start a separate process by using os.Argv[0]. The -gopls_test_binary
flag may be used to point tests at a separate gopls binary.

Updates golang/go#36879
Updates golang/go#34111

Change-Id: I1cfdf55040e81ffa69a6726878a96529e5522e82
Reviewed-on: https://go-review.googlesource.com/c/tools/+/218839
Run-TryBot: Robert Findley <rfindley@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Heschi Kreinick <heschi@google.com>
This commit is contained in:
Rob Findley 2020-02-10 11:34:13 -05:00 committed by Robert Findley
parent 741f65b509
commit 5fb17a1e7b
10 changed files with 237 additions and 40 deletions

View File

@ -6,6 +6,7 @@ package jsonrpc2
import ( import (
"context" "context"
"log"
"net" "net"
) )
@ -41,8 +42,8 @@ func HandlerServer(h Handler) StreamServer {
// ListenAndServe starts an jsonrpc2 server on the given address. It exits only // ListenAndServe starts an jsonrpc2 server on the given address. It exits only
// on error. // on error.
func ListenAndServe(ctx context.Context, addr string, server StreamServer) error { func ListenAndServe(ctx context.Context, network, addr string, server StreamServer) error {
ln, err := net.Listen("tcp", addr) ln, err := net.Listen(network, addr)
if err != nil { if err != nil {
return err return err
} }
@ -58,6 +59,10 @@ func Serve(ctx context.Context, ln net.Listener, server StreamServer) error {
return err return err
} }
stream := NewHeaderStream(netConn, netConn) stream := NewHeaderStream(netConn, netConn)
go server.ServeStream(ctx, stream) go func() {
if err := server.ServeStream(ctx, stream); err != nil {
log.Printf("serving stream: %v", err)
}
}()
} }
} }

View File

@ -27,7 +27,7 @@ type Connector interface {
type TCPServer struct { type TCPServer struct {
Addr string Addr string
ln net.Listener ln net.Listener
cls *closerList cls *closerList
} }
@ -67,7 +67,7 @@ func (s *TCPServer) Close() error {
// PipeServer is a test server that handles connections over io.Pipes. // PipeServer is a test server that handles connections over io.Pipes.
type PipeServer struct { type PipeServer struct {
server jsonrpc2.StreamServer server jsonrpc2.StreamServer
cls *closerList cls *closerList
} }
// NewPipeServer returns a test server that can be connected to via io.Pipes. // NewPipeServer returns a test server that can be connected to via io.Pipes.
@ -107,7 +107,7 @@ func (s *PipeServer) Close() error {
// convenience, so that callers don't have to worry about closing each // convenience, so that callers don't have to worry about closing each
// connection. // connection.
type closerList struct { type closerList struct {
mu sync.Mutex mu sync.Mutex
closers []func() closers []func()
} }

View File

@ -58,7 +58,7 @@ type Application struct {
env []string env []string
// Support for remote lsp server // Support for remote lsp server
Remote string `flag:"remote" help:"*EXPERIMENTAL* - forward all commands to a remote lsp"` Remote string `flag:"remote" help:"*EXPERIMENTAL* - forward all commands to a remote lsp specified by this flag. If prefixed by 'unix;', the subsequent address is assumed to be a unix domain socket. Otherwise, TCP is used."`
// Enable verbose logging // Enable verbose logging
Verbose bool `flag:"v" help:"verbose output"` Verbose bool `flag:"v" help:"verbose output"`

View File

@ -9,6 +9,7 @@ import (
"flag" "flag"
"fmt" "fmt"
"os" "os"
"strings"
"golang.org/x/tools/internal/jsonrpc2" "golang.org/x/tools/internal/jsonrpc2"
"golang.org/x/tools/internal/lsp/cache" "golang.org/x/tools/internal/lsp/cache"
@ -23,7 +24,7 @@ type Serve struct {
Logfile string `flag:"logfile" help:"filename to log to. if value is \"auto\", then logging to a default output file is enabled"` Logfile string `flag:"logfile" help:"filename to log to. if value is \"auto\", then logging to a default output file is enabled"`
Mode string `flag:"mode" help:"no effect"` Mode string `flag:"mode" help:"no effect"`
Port int `flag:"port" help:"port on which to run gopls for debugging purposes"` Port int `flag:"port" help:"port on which to run gopls for debugging purposes"`
Address string `flag:"listen" help:"address on which to listen for remote connections"` Address string `flag:"listen" help:"address on which to listen for remote connections. If prefixed by 'unix;', the subsequent address is assumed to be a unix domain socket. Otherwise, TCP is used."`
Trace bool `flag:"rpc.trace" help:"print the full rpc trace in lsp inspector format"` Trace bool `flag:"rpc.trace" help:"print the full rpc trace in lsp inspector format"`
Debug string `flag:"debug" help:"serve debug information on the supplied address"` Debug string `flag:"debug" help:"serve debug information on the supplied address"`
@ -64,17 +65,19 @@ func (s *Serve) Run(ctx context.Context, args ...string) error {
var ss jsonrpc2.StreamServer var ss jsonrpc2.StreamServer
if s.app.Remote != "" { if s.app.Remote != "" {
ss = lsprpc.NewForwarder(s.app.Remote, true) network, addr := parseAddr(s.app.Remote)
ss = lsprpc.NewForwarder(network, addr, true)
} else { } else {
ss = lsprpc.NewStreamServer(cache.New(s.app.options), true) ss = lsprpc.NewStreamServer(cache.New(s.app.options), true)
} }
if s.Address != "" { if s.Address != "" {
return jsonrpc2.ListenAndServe(ctx, s.Address, ss) network, addr := parseAddr(s.Address)
return jsonrpc2.ListenAndServe(ctx, network, addr, ss)
} }
if s.Port != 0 { if s.Port != 0 {
addr := fmt.Sprintf(":%v", s.Port) addr := fmt.Sprintf(":%v", s.Port)
return jsonrpc2.ListenAndServe(ctx, addr, ss) return jsonrpc2.ListenAndServe(ctx, "tcp", addr, ss)
} }
stream := jsonrpc2.NewHeaderStream(os.Stdin, os.Stdout) stream := jsonrpc2.NewHeaderStream(os.Stdin, os.Stdout)
if s.Trace { if s.Trace {
@ -82,3 +85,11 @@ func (s *Serve) Run(ctx context.Context, args ...string) error {
} }
return ss.ServeStream(ctx, stream) return ss.ServeStream(ctx, stream)
} }
// parseAddr parses the -listen flag in to a network, and address.
func parseAddr(listen string) (network string, address string) {
if parts := strings.SplitN(listen, ";", 2); len(parts) == 2 {
return parts[0], parts[1]
}
return "tcp", listen
}

View File

@ -0,0 +1,26 @@
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package cmd
import "testing"
func TestListenParsing(t *testing.T) {
tests := []struct {
input, wantNetwork, wantAddr string
}{
{"127.0.0.1:0", "tcp", "127.0.0.1:0"},
{"unix;/tmp/sock", "unix", "/tmp/sock"},
}
for _, test := range tests {
gotNetwork, gotAddr := parseAddr(test.input)
if gotNetwork != test.wantNetwork {
t.Errorf("network = %q, want %q", gotNetwork, test.wantNetwork)
}
if gotAddr != test.wantAddr {
t.Errorf("addr = %q, want %q", gotAddr, test.wantAddr)
}
}
}

View File

@ -9,8 +9,10 @@ package lsprpc
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"net" "net"
"os" "os"
"time"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"golang.org/x/tools/internal/jsonrpc2" "golang.org/x/tools/internal/jsonrpc2"
@ -63,16 +65,24 @@ func (s *StreamServer) ServeStream(ctx context.Context, stream jsonrpc2.Stream)
// be instrumented with telemetry, and want to be able to in some cases hijack // be instrumented with telemetry, and want to be able to in some cases hijack
// the jsonrpc2 connection with the daemon. // the jsonrpc2 connection with the daemon.
type Forwarder struct { type Forwarder struct {
remote string network, addr string
// Configuration. Right now, not all of this may be customizable, but in the
// future it probably will be.
withTelemetry bool withTelemetry bool
dialTimeout time.Duration
retries int
} }
// NewForwarder creates a new Forwarder, ready to forward connections to the // NewForwarder creates a new Forwarder, ready to forward connections to the
// given remote. // remote server specified by network and addr.
func NewForwarder(remote string, withTelemetry bool) *Forwarder { func NewForwarder(network, addr string, withTelemetry bool) *Forwarder {
return &Forwarder{ return &Forwarder{
remote: remote, network: network,
addr: addr,
withTelemetry: withTelemetry, withTelemetry: withTelemetry,
dialTimeout: 1 * time.Second,
retries: 5,
} }
} }
@ -82,7 +92,26 @@ func (f *Forwarder) ServeStream(ctx context.Context, stream jsonrpc2.Stream) err
clientConn := jsonrpc2.NewConn(stream) clientConn := jsonrpc2.NewConn(stream)
client := protocol.ClientDispatcher(clientConn) client := protocol.ClientDispatcher(clientConn)
netConn, err := net.Dial("tcp", f.remote) var (
netConn net.Conn
err error
)
// Sometimes the forwarder will be started immediately after the server is
// started. To account for these cases, add in some simple retrying.
// Note that the number of total attempts is f.retries + 1.
for attempt := 0; attempt <= f.retries; attempt++ {
startDial := time.Now()
netConn, err = net.DialTimeout(f.network, f.addr, f.dialTimeout)
if err == nil {
break
}
log.Printf("failed an attempt to connect to remote: %v\n", err)
// In case our failure was a fast-failure, ensure we wait at least
// f.dialTimeout before trying again.
if attempt != f.retries {
time.Sleep(f.dialTimeout - time.Since(startDial))
}
}
if err != nil { if err != nil {
return fmt.Errorf("forwarder: dialing remote: %v", err) return fmt.Errorf("forwarder: dialing remote: %v", err)
} }

View File

@ -102,7 +102,7 @@ func TestRequestCancellation(t *testing.T) {
ctx := context.Background() ctx := context.Background()
tsDirect := servertest.NewTCPServer(ctx, ss) tsDirect := servertest.NewTCPServer(ctx, ss)
forwarder := NewForwarder(tsDirect.Addr, false) forwarder := NewForwarder("tcp", tsDirect.Addr, false)
tsForwarded := servertest.NewPipeServer(ctx, forwarder) tsForwarded := servertest.NewPipeServer(ctx, forwarder)
tests := []struct { tests := []struct {

View File

@ -6,8 +6,13 @@
package regtest package regtest
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strings" "strings"
"sync" "sync"
"testing" "testing"
@ -25,14 +30,17 @@ import (
type EnvMode int type EnvMode int
const ( const (
// Singleton mode uses a separate cache for each test // Singleton mode uses a separate cache for each test.
Singleton EnvMode = 1 << iota Singleton EnvMode = 1 << iota
// Shared mode uses a Shared cache // Shared mode uses a Shared cache.
Shared Shared
// Forwarded forwards connections // Forwarded forwards connections to an in-process gopls instance.
Forwarded Forwarded
// AllModes runs tests in all modes // SeparateProcess runs a separate gopls process, and forwards connections to
AllModes = Singleton | Shared | Forwarded // it.
SeparateProcess
// NormalModes runs tests in all modes.
NormalModes = Singleton | Shared | Forwarded
) )
// A Runner runs tests in gopls execution environments, as specified by its // A Runner runs tests in gopls execution environments, as specified by its
@ -40,26 +48,90 @@ const (
// remote), any tests that execute on the same Runner will share the same // remote), any tests that execute on the same Runner will share the same
// state. // state.
type Runner struct { type Runner struct {
ts *servertest.TCPServer
defaultModes EnvMode defaultModes EnvMode
timeout time.Duration timeout time.Duration
goplsPath string
mu sync.Mutex
ts *servertest.TCPServer
socketDir string
} }
// NewTestRunner creates a Runner with its shared state initialized, ready to // NewTestRunner creates a Runner with its shared state initialized, ready to
// run tests. // run tests.
func NewTestRunner(modes EnvMode, testTimeout time.Duration) *Runner { func NewTestRunner(modes EnvMode, testTimeout time.Duration, goplsPath string) *Runner {
ss := lsprpc.NewStreamServer(cache.New(nil), false)
ts := servertest.NewTCPServer(context.Background(), ss)
return &Runner{ return &Runner{
ts: ts,
defaultModes: modes, defaultModes: modes,
timeout: testTimeout, timeout: testTimeout,
goplsPath: goplsPath,
} }
} }
// Modes returns the bitmask of environment modes this runner is configured to
// test.
func (r *Runner) Modes() EnvMode {
return r.defaultModes
}
// getTestServer gets the test server instance to connect to, or creates one if
// it doesn't exist.
func (r *Runner) getTestServer() *servertest.TCPServer {
r.mu.Lock()
defer r.mu.Unlock()
if r.ts == nil {
ss := lsprpc.NewStreamServer(cache.New(nil), false)
r.ts = servertest.NewTCPServer(context.Background(), ss)
}
return r.ts
}
// runTestAsGoplsEnvvar triggers TestMain to run gopls instead of running
// tests. It's a trick to allow tests to find a binary to use to start a gopls
// subprocess.
const runTestAsGoplsEnvvar = "_GOPLS_TEST_BINARY_RUN_AS_GOPLS"
func (r *Runner) getRemoteSocket(t *testing.T) string {
t.Helper()
r.mu.Lock()
defer r.mu.Unlock()
const daemonFile = "gopls-test-daemon"
if r.socketDir != "" {
return filepath.Join(r.socketDir, daemonFile)
}
if r.goplsPath == "" {
t.Fatal("cannot run tests with a separate process unless a path to a gopls binary is configured")
}
var err error
r.socketDir, err = ioutil.TempDir("", "gopls-regtests")
if err != nil {
t.Fatalf("creating tempdir: %v", err)
}
socket := filepath.Join(r.socketDir, daemonFile)
args := []string{"serve", "-listen", "unix;" + socket}
cmd := exec.Command(r.goplsPath, args...)
cmd.Env = append(os.Environ(), runTestAsGoplsEnvvar+"=true")
var stderr bytes.Buffer
cmd.Stderr = &stderr
go func() {
if err := cmd.Run(); err != nil {
panic(fmt.Sprintf("error running external gopls: %v\nstderr:\n%s", err, stderr.String()))
}
}()
return socket
}
// Close cleans up resource that have been allocated to this workspace. // Close cleans up resource that have been allocated to this workspace.
func (r *Runner) Close() error { func (r *Runner) Close() error {
return r.ts.Close() r.mu.Lock()
defer r.mu.Unlock()
if r.ts != nil {
r.ts.Close()
}
if r.socketDir != "" {
os.RemoveAll(r.socketDir)
}
return nil
} }
// Run executes the test function in the default configured gopls execution // Run executes the test function in the default configured gopls execution
@ -81,6 +153,7 @@ func (r *Runner) RunInMode(modes EnvMode, t *testing.T, filedata string, test fu
{"singleton", Singleton, r.singletonEnv}, {"singleton", Singleton, r.singletonEnv},
{"shared", Shared, r.sharedEnv}, {"shared", Shared, r.sharedEnv},
{"forwarded", Forwarded, r.forwardedEnv}, {"forwarded", Forwarded, r.forwardedEnv},
{"separate_process", SeparateProcess, r.separateProcessEnv},
} }
for _, tc := range tests { for _, tc := range tests {
@ -115,12 +188,23 @@ func (r *Runner) singletonEnv(ctx context.Context, t *testing.T) (servertest.Con
} }
func (r *Runner) sharedEnv(ctx context.Context, t *testing.T) (servertest.Connector, func()) { func (r *Runner) sharedEnv(ctx context.Context, t *testing.T) (servertest.Connector, func()) {
return r.ts, func() {} return r.getTestServer(), func() {}
} }
func (r *Runner) forwardedEnv(ctx context.Context, t *testing.T) (servertest.Connector, func()) { func (r *Runner) forwardedEnv(ctx context.Context, t *testing.T) (servertest.Connector, func()) {
forwarder := lsprpc.NewForwarder(r.ts.Addr, false) ts := r.getTestServer()
ts2 := servertest.NewTCPServer(ctx, forwarder) forwarder := lsprpc.NewForwarder("tcp", ts.Addr, false)
ts2 := servertest.NewPipeServer(ctx, forwarder)
cleanup := func() {
ts2.Close()
}
return ts2, cleanup
}
func (r *Runner) separateProcessEnv(ctx context.Context, t *testing.T) (servertest.Connector, func()) {
socket := r.getRemoteSocket(t)
forwarder := lsprpc.NewForwarder("unix", socket, false)
ts2 := servertest.NewPipeServer(ctx, forwarder)
cleanup := func() { cleanup := func() {
ts2.Close() ts2.Close()
} }

View File

@ -5,18 +5,33 @@
package regtest package regtest
import ( import (
"context"
"flag"
"fmt" "fmt"
"os" "os"
"path/filepath"
"testing" "testing"
"time" "time"
"golang.org/x/tools/internal/lsp" "golang.org/x/tools/internal/lsp"
"golang.org/x/tools/internal/lsp/cmd"
"golang.org/x/tools/internal/lsp/lsprpc" "golang.org/x/tools/internal/lsp/lsprpc"
"golang.org/x/tools/internal/tool"
)
var (
runSubprocessTests = flag.Bool("enable_gopls_subprocess_tests", false, "run regtests against a gopls subprocess")
goplsBinaryPath = flag.String("gopls_test_binary", "", "path to the gopls binary for use as a remote, for use with the -gopls_subprocess_testmode flag")
) )
var runner *Runner var runner *Runner
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
flag.Parse()
if os.Getenv("_GOPLS_TEST_BINARY_RUN_AS_GOPLS") == "true" {
tool.Main(context.Background(), cmd.New("gopls", "", nil, nil), os.Args[1:])
os.Exit(0)
}
// Override functions that would shut down the test process // Override functions that would shut down the test process
defer func(lspExit, forwarderExit func(code int)) { defer func(lspExit, forwarderExit func(code int)) {
lsp.ServerExitFunc = lspExit lsp.ServerExitFunc = lspExit
@ -28,7 +43,36 @@ func TestMain(m *testing.M) {
} }
// We don't want our forwarders to exit, but it's OK if they would have. // We don't want our forwarders to exit, but it's OK if they would have.
lsprpc.ForwarderExitFunc = func(code int) {} lsprpc.ForwarderExitFunc = func(code int) {}
runner = NewTestRunner(AllModes, 30*time.Second)
defer runner.Close() if *runSubprocessTests {
os.Exit(m.Run()) goplsPath := *goplsBinaryPath
if goplsPath == "" {
var err error
goplsPath, err = testBinaryPath()
if err != nil {
panic(fmt.Sprintf("finding test binary path: %v", err))
}
}
runner = NewTestRunner(NormalModes|SeparateProcess, 30*time.Second, goplsPath)
} else {
runner = NewTestRunner(NormalModes, 30*time.Second, "")
}
code := m.Run()
runner.Close()
os.Exit(code)
}
func testBinaryPath() (string, error) {
pth := os.Args[0]
if !filepath.IsAbs(pth) {
cwd, err := os.Getwd()
if err == nil {
return "", fmt.Errorf("os.Getwd: %v", err)
}
pth = filepath.Join(cwd, pth)
}
if _, err := os.Stat(pth); err != nil {
return "", fmt.Errorf("os.Stat: %v", err)
}
return pth, nil
} }

View File

@ -26,7 +26,9 @@ func main() {
}` }`
func runShared(t *testing.T, program string, testFunc func(ctx context.Context, t *testing.T, env1 *Env, env2 *Env)) { func runShared(t *testing.T, program string, testFunc func(ctx context.Context, t *testing.T, env1 *Env, env2 *Env)) {
runner.RunInMode(Forwarded, t, sharedProgram, func(ctx context.Context, t *testing.T, env1 *Env) { // Only run these tests in forwarded modes.
modes := runner.Modes() & (Forwarded | SeparateProcess)
runner.RunInMode(modes, t, sharedProgram, func(ctx context.Context, t *testing.T, env1 *Env) {
// Create a second test session connected to the same workspace and server // Create a second test session connected to the same workspace and server
// as the first. // as the first.
env2 := NewEnv(ctx, t, env1.W, env1.Server) env2 := NewEnv(ctx, t, env1.W, env1.Server)
@ -36,11 +38,7 @@ func runShared(t *testing.T, program string, testFunc func(ctx context.Context,
func TestSimultaneousEdits(t *testing.T) { func TestSimultaneousEdits(t *testing.T) {
t.Parallel() t.Parallel()
runner.Run(t, exampleProgram, func(ctx context.Context, t *testing.T, env1 *Env) { runShared(t, exampleProgram, func(ctx context.Context, t *testing.T, env1 *Env, env2 *Env) {
// Create a second test session connected to the same workspace and server
// as the first.
env2 := NewEnv(ctx, t, env1.W, env1.Server)
// In editor #1, break fmt.Println as before. // In editor #1, break fmt.Println as before.
edit1 := fake.NewEdit(5, 11, 5, 12, "") edit1 := fake.NewEdit(5, 11, 5, 12, "")
env1.OpenFile("main.go") env1.OpenFile("main.go")