diff --git a/src/pkg/Makefile b/src/pkg/Makefile index a9c400a9ce..6eee658a52 100644 --- a/src/pkg/Makefile +++ b/src/pkg/Makefile @@ -104,6 +104,7 @@ DIRS=\ reflect\ regexp\ rpc\ + rpc/jsonrpc\ runtime\ runtime/pprof\ scanner\ diff --git a/src/pkg/rpc/jsonrpc/Makefile b/src/pkg/rpc/jsonrpc/Makefile new file mode 100644 index 0000000000..1a4fd2e92c --- /dev/null +++ b/src/pkg/rpc/jsonrpc/Makefile @@ -0,0 +1,12 @@ +# Copyright 2010 The Go Authors. All rights reserved. +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file. + +include ../../../Make.$(GOARCH) + +TARG=rpc/jsonrpc +GOFILES=\ + client.go\ + server.go\ + +include ../../../Make.pkg diff --git a/src/pkg/rpc/jsonrpc/all_test.go b/src/pkg/rpc/jsonrpc/all_test.go new file mode 100644 index 0000000000..e94c594da0 --- /dev/null +++ b/src/pkg/rpc/jsonrpc/all_test.go @@ -0,0 +1,147 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package jsonrpc + +import ( + "fmt" + "json" + "net" + "os" + "rpc" + "testing" +) + +type Args struct { + A, B int +} + +type Reply struct { + C int +} + +type Arith int + +func (t *Arith) Add(args *Args, reply *Reply) os.Error { + reply.C = args.A + args.B + return nil +} + +func (t *Arith) Mul(args *Args, reply *Reply) os.Error { + reply.C = args.A * args.B + return nil +} + +func (t *Arith) Div(args *Args, reply *Reply) os.Error { + if args.B == 0 { + return os.ErrorString("divide by zero") + } + reply.C = args.A / args.B + return nil +} + +func (t *Arith) Error(args *Args, reply *Reply) os.Error { + panic("ERROR") +} + +func init() { + rpc.Register(new(Arith)) +} + +func TestServer(t *testing.T) { + type addResp struct { + Id interface{} "id" + Result Reply "result" + Error string "error" + } + + cli, srv := net.Pipe() + defer cli.Close() + go ServeConn(srv) + dec := json.NewDecoder(cli) + + // Send hand-coded requests to server, parse responses. + for i := 0; i < 10; i++ { + fmt.Fprintf(cli, `{"method": "Arith.Add", "id": "\u%04d", "params": [{"A": %d, "B": %d}]}`, i, i, i+1) + var resp addResp + err := dec.Decode(&resp) + if err != nil { + t.Fatalf("Decode: %s", err) + } + if resp.Error != "" { + t.Fatalf("resp.Error: %s", resp.Error) + } + if resp.Id.(string) != string(i) { + t.Fatalf("resp: bad id %q want %q", resp.Id.(string), string(i)) + } + if resp.Result.C != 2*i+1 { + t.Fatalf("resp: bad result: %d+%d=%d", i, i+1, resp.Result.C) + } + } +} + +func TestClient(t *testing.T) { + // Assume server is okay (TestServer is above). + // Test client against server. + cli, srv := net.Pipe() + go ServeConn(srv) + + client := NewClient(cli) + defer client.Close() + + // Synchronous calls + args := &Args{7, 8} + reply := new(Reply) + err := client.Call("Arith.Add", args, reply) + if err != nil { + t.Errorf("Add: expected no error but got string %q", err.String()) + } + if reply.C != args.A+args.B { + t.Errorf("Add: expected %d got %d", reply.C, args.A+args.B) + } + + args = &Args{7, 8} + reply = new(Reply) + err = client.Call("Arith.Mul", args, reply) + if err != nil { + t.Errorf("Mul: expected no error but got string %q", err.String()) + } + if reply.C != args.A*args.B { + t.Errorf("Mul: expected %d got %d", reply.C, args.A*args.B) + } + + // Out of order. + args = &Args{7, 8} + mulReply := new(Reply) + mulCall := client.Go("Arith.Mul", args, mulReply, nil) + addReply := new(Reply) + addCall := client.Go("Arith.Add", args, addReply, nil) + + addCall = <-addCall.Done + if addCall.Error != nil { + t.Errorf("Add: expected no error but got string %q", addCall.Error.String()) + } + if addReply.C != args.A+args.B { + t.Errorf("Add: expected %d got %d", addReply.C, args.A+args.B) + } + + mulCall = <-mulCall.Done + if mulCall.Error != nil { + t.Errorf("Mul: expected no error but got string %q", mulCall.Error.String()) + } + if mulReply.C != args.A*args.B { + t.Errorf("Mul: expected %d got %d", mulReply.C, args.A*args.B) + } + + // Error test + args = &Args{7, 0} + reply = new(Reply) + err = client.Call("Arith.Div", args, reply) + // expect an error: zero divide + if err == nil { + t.Error("Div: expected error") + } else if err.String() != "divide by zero" { + t.Error("Div: expected divide by zero error; got", err) + } +} diff --git a/src/pkg/rpc/jsonrpc/client.go b/src/pkg/rpc/jsonrpc/client.go new file mode 100644 index 0000000000..ed2b4ed379 --- /dev/null +++ b/src/pkg/rpc/jsonrpc/client.go @@ -0,0 +1,110 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package jsonrpc implements a JSON-RPC ClientCodec and ServerCodec +// for the rpc package. +package jsonrpc + +import ( + "io" + "json" + "net" + "os" + "rpc" + "sync" +) + +type clientCodec struct { + dec *json.Decoder // for reading JSON values + enc *json.Encoder // for writing JSON values + c io.Closer + + // temporary work space + req clientRequest + resp clientResponse + + // JSON-RPC responses include the request id but not the request method. + // Package rpc expects both. + // We save the request method in pending when sending a request + // and then look it up by request ID when filling out the rpc Response. + mutex sync.Mutex // protects pending + pending map[uint64]string // map request id to method name +} + +// NewClientCodec returns a new rpc.ClientCodec using JSON-RPC on conn. +func NewClientCodec(conn io.ReadWriteCloser) rpc.ClientCodec { + return &clientCodec{ + dec: json.NewDecoder(conn), + enc: json.NewEncoder(conn), + c: conn, + pending: make(map[uint64]string), + } +} + +type clientRequest struct { + Method string "method" + Params [1]interface{} "params" + Id uint64 "id" +} + +func (c *clientCodec) WriteRequest(r *rpc.Request, param interface{}) os.Error { + c.mutex.Lock() + c.pending[r.Seq] = r.ServiceMethod + c.mutex.Unlock() + c.req.Method = r.ServiceMethod + c.req.Params[0] = param + c.req.Id = r.Seq + return c.enc.Encode(&c.req) +} + +type clientResponse struct { + Id uint64 "id" + Result *json.RawMessage "result" + Error string "error" +} + +func (r *clientResponse) reset() { + r.Id = 0 + r.Result = nil + r.Error = "" +} + +func (c *clientCodec) ReadResponseHeader(r *rpc.Response) os.Error { + c.resp.reset() + if err := c.dec.Decode(&c.resp); err != nil { + return err + } + + c.mutex.Lock() + r.ServiceMethod = c.pending[c.resp.Id] + c.pending[c.resp.Id] = "", false + c.mutex.Unlock() + + r.Seq = c.resp.Id + r.Error = c.resp.Error + return nil +} + +func (c *clientCodec) ReadResponseBody(x interface{}) os.Error { + return json.Unmarshal(*c.resp.Result, x) +} + +func (c *clientCodec) Close() os.Error { + return c.c.Close() +} + +// NewClient returns a new rpc.Client to handle requests to the +// set of services at the other end of the connection. +func NewClient(conn io.ReadWriteCloser) *rpc.Client { + return rpc.NewClientWithCodec(NewClientCodec(conn)) +} + +// Dial connects to a JSON-RPC server at the specified network address. +func Dial(network, address string) (*rpc.Client, os.Error) { + conn, err := net.Dial(network, "", address) + if err != nil { + return nil, err + } + return NewClient(conn), err +} diff --git a/src/pkg/rpc/jsonrpc/server.go b/src/pkg/rpc/jsonrpc/server.go new file mode 100644 index 0000000000..9f3472a39c --- /dev/null +++ b/src/pkg/rpc/jsonrpc/server.go @@ -0,0 +1,123 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package jsonrpc + +import ( + "io" + "json" + "os" + "rpc" + "sync" +) + +type serverCodec struct { + dec *json.Decoder // for reading JSON values + enc *json.Encoder // for writing JSON values + c io.Closer + + // temporary work space + req serverRequest + resp serverResponse + + // JSON-RPC clients can use arbitrary json values as request IDs. + // Package rpc expects uint64 request IDs. + // We assign uint64 sequence numbers to incoming requests + // but save the original request ID in the pending map. + // When rpc responds, we use the sequence number in + // the response to find the original request ID. + mutex sync.Mutex // protects seq, pending + seq uint64 + pending map[uint64]*json.RawMessage +} + +// NewServerCodec returns a new rpc.ServerCodec using JSON-RPC on conn. +func NewServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec { + return &serverCodec{ + dec: json.NewDecoder(conn), + enc: json.NewEncoder(conn), + c: conn, + pending: make(map[uint64]*json.RawMessage), + } +} + +type serverRequest struct { + Method string "method" + Params *json.RawMessage "params" + Id *json.RawMessage "id" +} + +func (r *serverRequest) reset() { + r.Method = "" + if r.Params != nil { + *r.Params = (*r.Params)[0:0] + } + if r.Id != nil { + *r.Id = (*r.Id)[0:0] + } +} + +type serverResponse struct { + Id *json.RawMessage "id" + Result interface{} "result" + Error string "error" +} + +func (c *serverCodec) ReadRequestHeader(r *rpc.Request) os.Error { + c.req.reset() + if err := c.dec.Decode(&c.req); err != nil { + return err + } + r.ServiceMethod = c.req.Method + + // JSON request id can be any JSON value; + // RPC package expects uint64. Translate to + // internal uint64 and save JSON on the side. + c.mutex.Lock() + c.seq++ + c.pending[c.seq] = c.req.Id + c.req.Id = nil + r.Seq = c.seq + c.mutex.Unlock() + + return nil +} + +func (c *serverCodec) ReadRequestBody(x interface{}) os.Error { + // JSON params is array value. + // RPC params is struct. + // Unmarshal into array containing struct for now. + // Should think about making RPC more general. + var params [1]interface{} + params[0] = x + return json.Unmarshal(*c.req.Params, ¶ms) +} + +func (c *serverCodec) WriteResponse(r *rpc.Response, x interface{}) os.Error { + var resp serverResponse + c.mutex.Lock() + b, ok := c.pending[r.Seq] + if !ok { + c.mutex.Unlock() + return os.NewError("invalid sequence number in response") + } + c.pending[r.Seq] = nil, false + c.mutex.Unlock() + + resp.Id = b + resp.Result = x + resp.Error = r.Error + return c.enc.Encode(resp) +} + +func (c *serverCodec) Close() os.Error { + return c.c.Close() +} + +// ServeConn runs the JSON-RPC server on a single connection. +// ServeConn blocks, serving the connection until the client hangs up. +// The caller typically invokes ServeConn in a go statement. +func ServeConn(conn io.ReadWriteCloser) { + rpc.ServeCodec(NewServerCodec(conn)) +}