diff --git a/src/pkg/rpc/client.go b/src/pkg/rpc/client.go index c4e8a6bc7b..a76f2b65a9 100644 --- a/src/pkg/rpc/client.go +++ b/src/pkg/rpc/client.go @@ -66,7 +66,7 @@ func (client *Client) send(c *Call) { client.sending.Unlock(); } -func (client *Client) serve() { +func (client *Client) input() { var err os.Error; for err == nil { response := new(Response); @@ -107,7 +107,7 @@ func NewClient(conn io.ReadWriteCloser) *Client { client.enc = gob.NewEncoder(conn); client.dec = gob.NewDecoder(conn); client.pending = make(map[uint64] *Call); - go client.serve(); + go client.input(); return client; } diff --git a/src/pkg/rpc/server.go b/src/pkg/rpc/server.go index 79feccc699..dadfae0c94 100644 --- a/src/pkg/rpc/server.go +++ b/src/pkg/rpc/server.go @@ -14,7 +14,6 @@ import ( "reflect"; "strings"; "sync"; - "time"; // See TODO in serve() "unicode"; "utf8"; ) @@ -174,7 +173,7 @@ func (s *service) call(sending *sync.Mutex, function *reflect.FuncValue, req *Re sendResponse(sending, req, replyv.Interface(), enc, errmsg); } -func (server *serverType) serve(conn io.ReadWriteCloser) { +func (server *serverType) input(conn io.ReadWriteCloser) { dec := gob.NewDecoder(conn); enc := gob.NewEncoder(conn); sending := new(sync.Mutex); @@ -183,28 +182,32 @@ func (server *serverType) serve(conn io.ReadWriteCloser) { req := new(Request); err := dec.Decode(req); if err != nil { + if err == os.EOF || err == io.ErrUnexpectedEOF { + log.Stderr("rpc: ", err); + break; + } s := "rpc: server cannot decode request: " + err.String(); sendResponse(sending, req, invalidRequest, enc, s); - break; + continue; } serviceMethod := strings.Split(req.ServiceMethod, ".", 0); if len(serviceMethod) != 2 { s := "rpc: service/method request ill:formed: " + req.ServiceMethod; sendResponse(sending, req, invalidRequest, enc, s); - break; + continue; } // Look up the request. service, ok := server.serviceMap[serviceMethod[0]]; if !ok { s := "rpc: can't find service " + req.ServiceMethod; sendResponse(sending, req, invalidRequest, enc, s); - break; + continue; } mtype, ok := service.method[serviceMethod[1]]; if !ok { s := "rpc: can't find method " + req.ServiceMethod; sendResponse(sending, req, invalidRequest, enc, s); - break; + continue; } method := mtype.method; // Decode the argument value. @@ -212,18 +215,12 @@ func (server *serverType) serve(conn io.ReadWriteCloser) { replyv := _new(mtype.replyType); err = dec.Decode(argv.Interface()); if err != nil { - log.Stderr("tearing down connection:", err); + log.Stderr("rpc: tearing down", serviceMethod[0], "connection:", err); sendResponse(sending, req, replyv.Interface(), enc, err.String()); - break; + continue; } go service.call(sending, method.Func, req, argv, replyv, enc); } - // TODO(r): Gobs cannot handle unexpected data yet. Once they can, we can - // ignore it and the connection can persist. For now, though, bad data - // ruins the connection and we must shut down. The sleep is necessary to - // guarantee all the data gets out before we close the connection, so the - // client can see the error description. - time.Sleep(2e9); conn.Close(); } @@ -233,7 +230,7 @@ func (server *serverType) accept(lis net.Listener) { if err != nil { log.Exit("rpc.Serve: accept:", err.String()); // TODO(r): exit? } - go server.serve(conn); + go server.input(conn); } } @@ -250,7 +247,7 @@ func Add(rcvr interface{}) os.Error { // ServeConn runs the server on a single connection. When the connection // completes, service terminates. func ServeConn(conn io.ReadWriteCloser) { - go server.serve(conn) + go server.input(conn) } // Accept accepts connections on the listener and serves requests @@ -276,7 +273,7 @@ func serveHTTP(c *http.Conn, req *http.Request) { return; } io.WriteString(conn, "HTTP/1.0 " + connected + "\n\n"); - server.serve(conn); + server.input(conn); } // HandleHTTP registers an HTTP handler for RPC messages.