1
0
mirror of https://github.com/golang/go synced 2024-11-05 11:36:10 -07:00

internal/jsonrpc2: change StreamServer to operate on Conn instead of Stream

Really the name is wrong now, but this is just a stepping stone towards removing
it entirely in favour of a new listener/dialer/server/client pattern, so I am
minimizing the churn by leaving the names alone for now.

Change-Id: I771d117490763ebe05ed2a8c52d490deeb4d5333
Reviewed-on: https://go-review.googlesource.com/c/tools/+/232878
Run-TryBot: Ian Cottrell <iancottrell@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Robert Findley <rfindley@google.com>
This commit is contained in:
Ian Cottrell 2020-05-07 16:32:20 -04:00
parent d3396bb197
commit ef124de36d
4 changed files with 15 additions and 15 deletions

View File

@ -20,25 +20,24 @@ import (
// semantics. // semantics.
// A StreamServer is used to serve incoming jsonrpc2 clients communicating over // A StreamServer is used to serve incoming jsonrpc2 clients communicating over
// a newly created stream. // a newly created connection.
type StreamServer interface { type StreamServer interface {
ServeStream(context.Context, Stream) error ServeStream(context.Context, Conn) error
} }
// The ServerFunc type is an adapter that implements the StreamServer interface // The ServerFunc type is an adapter that implements the StreamServer interface
// using an ordinary function. // using an ordinary function.
type ServerFunc func(context.Context, Stream) error type ServerFunc func(context.Context, Conn) error
// ServeStream calls f(ctx, s). // ServeStream calls f(ctx, s).
func (f ServerFunc) ServeStream(ctx context.Context, s Stream) error { func (f ServerFunc) ServeStream(ctx context.Context, c Conn) error {
return f(ctx, s) return f(ctx, c)
} }
// HandlerServer returns a StreamServer that handles incoming streams using the // HandlerServer returns a StreamServer that handles incoming streams using the
// provided handler. // provided handler.
func HandlerServer(h Handler) StreamServer { func HandlerServer(h Handler) StreamServer {
return ServerFunc(func(ctx context.Context, s Stream) error { return ServerFunc(func(ctx context.Context, conn Conn) error {
conn := NewConn(s)
conn.Go(ctx, h) conn.Go(ctx, h)
<-conn.Done() <-conn.Done()
return conn.Err() return conn.Err()
@ -82,7 +81,7 @@ func Serve(ctx context.Context, ln net.Listener, server StreamServer, idleTimeou
nc, err := ln.Accept() nc, err := ln.Accept()
if err != nil { if err != nil {
select { select {
case doneListening <- fmt.Errorf("Accept(): %v", err): case doneListening <- fmt.Errorf("Accept(): %w", err):
case <-ctx.Done(): case <-ctx.Done():
} }
return return
@ -99,7 +98,8 @@ func Serve(ctx context.Context, ln net.Listener, server StreamServer, idleTimeou
connTimer.Stop() connTimer.Stop()
stream := NewHeaderStream(netConn) stream := NewHeaderStream(netConn)
go func() { go func() {
closedConns <- server.ServeStream(ctx, stream) conn := NewConn(stream)
closedConns <- server.ServeStream(ctx, conn)
stream.Close() stream.Close()
}() }()
case err := <-doneListening: case err := <-doneListening:

View File

@ -88,7 +88,8 @@ func (s *PipeServer) Connect(ctx context.Context) jsonrpc2.Conn {
cPipe.Close() cPipe.Close()
}) })
serverStream := s.framer(sPipe) serverStream := s.framer(sPipe)
go s.server.ServeStream(ctx, serverStream) serverConn := jsonrpc2.NewConn(serverStream)
go s.server.ServeStream(ctx, serverConn)
clientStream := s.framer(cPipe) clientStream := s.framer(cPipe)
return jsonrpc2.NewConn(clientStream) return jsonrpc2.NewConn(clientStream)

View File

@ -97,7 +97,8 @@ func (s *Serve) Run(ctx context.Context, args ...string) error {
if s.Trace && di != nil { if s.Trace && di != nil {
stream = protocol.LoggingStream(stream, di.LogWriter) stream = protocol.LoggingStream(stream, di.LogWriter)
} }
return ss.ServeStream(ctx, stream) conn := jsonrpc2.NewConn(stream)
return ss.ServeStream(ctx, conn)
} }
// parseAddr parses the -listen flag in to a network, and address. // parseAddr parses the -listen flag in to a network, and address.

View File

@ -105,10 +105,9 @@ func (c debugClient) ServerID() string {
// ServeStream implements the jsonrpc2.StreamServer interface, by handling // ServeStream implements the jsonrpc2.StreamServer interface, by handling
// incoming streams using a new lsp server. // incoming streams using a new lsp server.
func (s *StreamServer) ServeStream(ctx context.Context, stream jsonrpc2.Stream) error { func (s *StreamServer) ServeStream(ctx context.Context, conn jsonrpc2.Conn) error {
index := atomic.AddInt64(&clientIndex, 1) index := atomic.AddInt64(&clientIndex, 1)
conn := jsonrpc2.NewConn(stream)
client := protocol.ClientDispatcher(conn) client := protocol.ClientDispatcher(conn)
session := s.cache.NewSession(ctx) session := s.cache.NewSession(ctx)
dc := &debugClient{ dc := &debugClient{
@ -245,8 +244,7 @@ func QueryServerState(ctx context.Context, network, address string) (*ServerStat
// ServeStream dials the forwarder remote and binds the remote to serve the LSP // ServeStream dials the forwarder remote and binds the remote to serve the LSP
// on the incoming stream. // on the incoming stream.
func (f *Forwarder) ServeStream(ctx context.Context, stream jsonrpc2.Stream) error { func (f *Forwarder) ServeStream(ctx context.Context, clientConn jsonrpc2.Conn) error {
clientConn := jsonrpc2.NewConn(stream)
client := protocol.ClientDispatcher(clientConn) client := protocol.ClientDispatcher(clientConn)
netConn, err := f.connectToRemote(ctx) netConn, err := f.connectToRemote(ctx)