From 250ac87368f25bf1c35ffd6f522639fea119dc2d Mon Sep 17 00:00:00 2001 From: Andrew Gerrand Date: Thu, 28 Oct 2010 11:05:56 +1100 Subject: [PATCH] rpc: expose Server type to allow multiple RPC Server instances R=r, rsc, msolo, sougou CC=golang-dev https://golang.org/cl/2696041 --- src/pkg/rpc/client.go | 11 +++- src/pkg/rpc/debug.go | 6 +- src/pkg/rpc/server.go | 127 ++++++++++++++++++++++++------------- src/pkg/rpc/server_test.go | 71 ++++++++++++++++----- 4 files changed, 152 insertions(+), 63 deletions(-) diff --git a/src/pkg/rpc/client.go b/src/pkg/rpc/client.go index 98b992bae0..2f52d19c6e 100644 --- a/src/pkg/rpc/client.go +++ b/src/pkg/rpc/client.go @@ -162,14 +162,21 @@ func (c *gobClientCodec) Close() os.Error { } -// DialHTTP connects to an HTTP RPC server at the specified network address. +// DialHTTP connects to an HTTP RPC server at the specified network address +// listening on the default HTTP RPC path. func DialHTTP(network, address string) (*Client, os.Error) { + return DialHTTPPath(network, address, DefaultRPCPath) +} + +// DialHTTPPath connects to an HTTP RPC server +// at the specified network address and path. +func DialHTTPPath(network, address, path string) (*Client, os.Error) { var err os.Error conn, err := net.Dial(network, "", address) if err != nil { return nil, err } - io.WriteString(conn, "CONNECT "+rpcPath+" HTTP/1.0\n\n") + io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n") // Require successful HTTP response // before switching to RPC protocol. diff --git a/src/pkg/rpc/debug.go b/src/pkg/rpc/debug.go index e1e53f0905..6bd8a91fef 100644 --- a/src/pkg/rpc/debug.go +++ b/src/pkg/rpc/debug.go @@ -61,8 +61,12 @@ func (m methodArray) Len() int { return len(m) } func (m methodArray) Less(i, j int) bool { return m[i].name < m[j].name } func (m methodArray) Swap(i, j int) { m[i], m[j] = m[j], m[i] } +type debugHTTP struct { + *Server +} + // Runs at /debug/rpc -func debugHTTP(w http.ResponseWriter, req *http.Request) { +func (server debugHTTP) ServeHTTP(w http.ResponseWriter, req *http.Request) { // Build a sorted version of the data. var services = make(serviceArray, len(server.serviceMap)) i := 0 diff --git a/src/pkg/rpc/server.go b/src/pkg/rpc/server.go index 792201515e..dbb68dde84 100644 --- a/src/pkg/rpc/server.go +++ b/src/pkg/rpc/server.go @@ -3,9 +3,9 @@ // license that can be found in the LICENSE file. /* - The rpc package provides access to the public methods of an object across a + The rpc package provides access to the exported methods of an object across a network or other I/O connection. A server registers an object, making it visible - as a service with the name of the type of the object. After registration, public + as a service with the name of the type of the object. After registration, exported methods of the object will be accessible remotely. A server may register multiple objects (services) of different types but it is an error to register multiple objects of the same type. @@ -13,8 +13,8 @@ Only methods that satisfy these criteria will be made available for remote access; other methods will be ignored: - - the method receiver and name are publicly visible, that is, begin with an upper case letter. - - the method has two arguments, both pointers to publicly visible types. + - the method receiver and name are exported, that is, begin with an upper case letter. + - the method has two arguments, both pointers to exported types. - the method has return type os.Error. The method's first argument represents the arguments provided by the caller; the @@ -123,6 +123,12 @@ import ( "utf8" ) +const ( + // Defaults used by HandleHTTP + DefaultRPCPath = "/_goRPC_" + DefaultDebugPath = "/debug/rpc" +) + // Precompute the reflect type for os.Error. Can't use os.Error directly // because Typeof takes an empty interface value. This is annoying. var unusedError *os.Error @@ -166,23 +172,34 @@ type ClientInfo struct { RemoteAddr string } -type serverType struct { +// Server represents an RPC Server. +type Server struct { sync.Mutex // protects the serviceMap serviceMap map[string]*service } -// This variable is a global whose "public" methods are really private methods -// called from the global functions of this package: rpc.Register, rpc.ServeConn, etc. -// For example, rpc.Register() calls server.add(). -var server = &serverType{serviceMap: make(map[string]*service)} +// NewServer returns a new Server. +func NewServer() *Server { + return &Server{serviceMap: make(map[string]*service)} +} -// Is this a publicly visible - upper case - name? -func isPublic(name string) bool { +// DefaultServer is the default instance of *Server. +var DefaultServer = NewServer() + +// Is this an exported - upper case - name? +func isExported(name string) bool { rune, _ := utf8.DecodeRuneInString(name) return unicode.IsUpper(rune) } -func (server *serverType) register(rcvr interface{}) os.Error { +// Register publishes in the server the set of methods of the +// receiver value that satisfy the following conditions: +// - exported method +// - two arguments, both pointers to exported structs +// - one return value, of type os.Error +// It returns an error if the receiver is not an exported type or has no +// suitable methods. +func (server *Server) Register(rcvr interface{}) os.Error { server.Lock() defer server.Unlock() if server.serviceMap == nil { @@ -195,8 +212,8 @@ func (server *serverType) register(rcvr interface{}) os.Error { if sname == "" { log.Exit("rpc: no service name for type", s.typ.String()) } - if s.typ.PkgPath() != "" && !isPublic(sname) { - s := "rpc Register: type " + sname + " is not public" + if s.typ.PkgPath() != "" && !isExported(sname) { + s := "rpc Register: type " + sname + " is not exported" log.Print(s) return os.ErrorString(s) } @@ -211,7 +228,7 @@ func (server *serverType) register(rcvr interface{}) os.Error { method := s.typ.Method(m) mtype := method.Type mname := method.Name - if mtype.PkgPath() != "" || !isPublic(mname) { + if mtype.PkgPath() != "" || !isExported(mname) { continue } // Method needs three ins: receiver, *args, *reply. @@ -229,12 +246,12 @@ func (server *serverType) register(rcvr interface{}) os.Error { log.Println(mname, "reply type not a pointer:", mtype.In(2)) continue } - if argType.Elem().PkgPath() != "" && !isPublic(argType.Elem().Name()) { - log.Println(mname, "argument type not public:", argType) + if argType.Elem().PkgPath() != "" && !isExported(argType.Elem().Name()) { + log.Println(mname, "argument type not exported:", argType) continue } - if replyType.Elem().PkgPath() != "" && !isPublic(replyType.Elem().Name()) { - log.Println(mname, "reply type not public:", replyType) + if replyType.Elem().PkgPath() != "" && !isExported(replyType.Elem().Name()) { + log.Println(mname, "reply type not exported:", replyType) continue } if mtype.NumIn() == 4 { @@ -257,7 +274,7 @@ func (server *serverType) register(rcvr interface{}) os.Error { } if len(s.method) == 0 { - s := "rpc Register: type " + sname + " has no public methods of suitable type" + s := "rpc Register: type " + sname + " has no exported methods of suitable type" log.Print(s) return os.ErrorString(s) } @@ -335,7 +352,19 @@ func (c *gobServerCodec) Close() os.Error { return c.rwc.Close() } -func (server *serverType) input(codec ServerCodec) { + +// ServeConn runs the server on a single connection. +// ServeConn blocks, serving the connection until the client hangs up. +// The caller typically invokes ServeConn in a go statement. +// ServeConn uses the gob wire format (see package gob) on the +// connection. To use an alternate codec, use ServeCodec. +func (server *Server) ServeConn(conn io.ReadWriteCloser) { + server.ServeCodec(&gobServerCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(conn)}) +} + +// ServeCodec is like ServeConn but uses the specified codec to +// decode requests and encode responses. +func (server *Server) ServeCodec(codec ServerCodec) { sending := new(sync.Mutex) for { // Grab the request header. @@ -387,24 +416,27 @@ func (server *serverType) input(codec ServerCodec) { codec.Close() } -func (server *serverType) accept(lis net.Listener) { +// Accept accepts connections on the listener and serves requests +// for each incoming connection. Accept blocks; the caller typically +// invokes it in a go statement. +func (server *Server) Accept(lis net.Listener) { for { conn, err := lis.Accept() if err != nil { log.Exit("rpc.Serve: accept:", err.String()) // TODO(r): exit? } - go ServeConn(conn) + go server.ServeConn(conn) } } -// Register publishes in the server the set of methods of the -// receiver value that satisfy the following conditions: -// - public method -// - two arguments, both pointers to public structs -// - one return value of type os.Error -// It returns an error if the receiver is not public or has no +// Register publishes in the DefaultServer the set of methods +// of the receiver value that satisfy the following conditions: +// - exported method +// - two arguments, both pointers to exported structs +// - one return value, of type os.Error +// It returns an error if the receiver is not an exported type or has no // suitable methods. -func Register(rcvr interface{}) os.Error { return server.register(rcvr) } +func Register(rcvr interface{}) os.Error { return DefaultServer.Register(rcvr) } // A ServerCodec implements reading of RPC requests and writing of // RPC responses for the server side of an RPC session. @@ -420,36 +452,35 @@ type ServerCodec interface { Close() os.Error } -// ServeConn runs the server on a single connection. +// ServeConn runs the DefaultServer on a single connection. // ServeConn blocks, serving the connection until the client hangs up. // The caller typically invokes ServeConn in a go statement. // ServeConn uses the gob wire format (see package gob) on the // connection. To use an alternate codec, use ServeCodec. func ServeConn(conn io.ReadWriteCloser) { - ServeCodec(&gobServerCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(conn)}) + DefaultServer.ServeConn(conn) } // ServeCodec is like ServeConn but uses the specified codec to // decode requests and encode responses. func ServeCodec(codec ServerCodec) { - server.input(codec) + DefaultServer.ServeCodec(codec) } // Accept accepts connections on the listener and serves requests -// for each incoming connection. Accept blocks; the caller typically -// invokes it in a go statement. -func Accept(lis net.Listener) { server.accept(lis) } +// to DefaultServer for each incoming connection. +// Accept blocks; the caller typically invokes it in a go statement. +func Accept(lis net.Listener) { DefaultServer.Accept(lis) } // Can connect to RPC service using HTTP CONNECT to rpcPath. -var rpcPath string = "/_goRPC_" -var debugPath string = "/debug/rpc" var connected = "200 Connected to Go RPC" -func serveHTTP(w http.ResponseWriter, req *http.Request) { +// ServeHTTP implements an http.Handler that answers RPC requests. +func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) { if req.Method != "CONNECT" { w.SetHeader("Content-Type", "text/plain; charset=utf-8") w.WriteHeader(http.StatusMethodNotAllowed) - io.WriteString(w, "405 must CONNECT to "+rpcPath+"\n") + io.WriteString(w, "405 must CONNECT\n") return } conn, _, err := w.Hijack() @@ -458,12 +489,20 @@ func serveHTTP(w http.ResponseWriter, req *http.Request) { return } io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n") - ServeConn(conn) + server.ServeConn(conn) } -// HandleHTTP registers an HTTP handler for RPC messages. +// HandleHTTP registers an HTTP handler for RPC messages on rpcPath, +// and a debugging handler on debugPath. +// It is still necessary to invoke http.Serve(), typically in a go statement. +func (server *Server) HandleHTTP(rpcPath, debugPath string) { + http.Handle(rpcPath, server) + http.Handle(debugPath, debugHTTP{server}) +} + +// HandleHTTP registers an HTTP handler for RPC messages to DefaultServer +// on DefaultRPCPath and a debugging handler on DefaultDebugPath. // It is still necessary to invoke http.Serve(), typically in a go statement. func HandleHTTP() { - http.Handle(rpcPath, http.HandlerFunc(serveHTTP)) - http.Handle(debugPath, http.HandlerFunc(debugHTTP)) + DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath) } diff --git a/src/pkg/rpc/server_test.go b/src/pkg/rpc/server_test.go index 1d4c48c958..e826904c2d 100644 --- a/src/pkg/rpc/server_test.go +++ b/src/pkg/rpc/server_test.go @@ -15,12 +15,16 @@ import ( "testing" ) -var serverAddr string -var httpServerAddr string -var once sync.Once - -const second = 1e9 +var ( + serverAddr, newServerAddr string + httpServerAddr string + once, newOnce, httpOnce sync.Once +) +const ( + second = 1e9 + newHttpPath = "/foo" +) type Args struct { A, B int @@ -64,23 +68,42 @@ func (t *Arith) Error(args *Args, reply *Reply) os.Error { panic("ERROR") } -func startServer() { - Register(new(Arith)) - +func listenTCP() (net.Listener, string) { l, e := net.Listen("tcp", "127.0.0.1:0") // any available address if e != nil { log.Exitf("net.Listen tcp :0: %v", e) } - serverAddr = l.Addr().String() + return l, l.Addr().String() +} + +func startServer() { + Register(new(Arith)) + + var l net.Listener + l, serverAddr = listenTCP() log.Println("Test RPC server listening on", serverAddr) go Accept(l) HandleHTTP() - l, e = net.Listen("tcp", "127.0.0.1:0") // any available address - if e != nil { - log.Printf("net.Listen tcp :0: %v", e) - os.Exit(1) - } + httpOnce.Do(startHttpServer) +} + +func startNewServer() { + s := NewServer() + s.Register(new(Arith)) + + var l net.Listener + l, newServerAddr = listenTCP() + log.Println("NewServer test RPC server listening on", newServerAddr) + go Accept(l) + + s.HandleHTTP(newHttpPath, "/bar") + httpOnce.Do(startHttpServer) +} + +func startHttpServer() { + var l net.Listener + l, httpServerAddr = listenTCP() httpServerAddr = l.Addr().String() log.Println("Test HTTP RPC server listening on", httpServerAddr) go http.Serve(l, nil) @@ -88,8 +111,13 @@ func startServer() { func TestRPC(t *testing.T) { once.Do(startServer) + testRPC(t, serverAddr) + newOnce.Do(startNewServer) + testRPC(t, newServerAddr) +} - client, err := Dial("tcp", serverAddr) +func testRPC(t *testing.T, addr string) { + client, err := Dial("tcp", addr) if err != nil { t.Fatal("dialing", err) } @@ -175,8 +203,19 @@ func TestRPC(t *testing.T) { func TestHTTPRPC(t *testing.T) { once.Do(startServer) + testHTTPRPC(t, "") + newOnce.Do(startNewServer) + testHTTPRPC(t, newHttpPath) +} - client, err := DialHTTP("tcp", httpServerAddr) +func testHTTPRPC(t *testing.T, path string) { + var client *Client + var err os.Error + if path == "" { + client, err = DialHTTP("tcp", httpServerAddr) + } else { + client, err = DialHTTPPath("tcp", httpServerAddr, path) + } if err != nil { t.Fatal("dialing", err) }