From a31f317a990ad663e24b3521a958705280c14733 Mon Sep 17 00:00:00 2001 From: Dmitriy Vyukov Date: Wed, 31 Aug 2011 13:07:25 -0400 Subject: [PATCH] rpc: add benchmark for async rpc calls Also makes sync benchmark concurrent. R=r, rsc CC=golang-dev https://golang.org/cl/4911043 --- src/pkg/rpc/server_test.go | 91 +++++++++++++++++++++++++++++++++----- 1 file changed, 80 insertions(+), 11 deletions(-) diff --git a/src/pkg/rpc/server_test.go b/src/pkg/rpc/server_test.go index e7bbfbe97d2..cb2db2a65d3 100644 --- a/src/pkg/rpc/server_test.go +++ b/src/pkg/rpc/server_test.go @@ -14,6 +14,7 @@ import ( "runtime" "strings" "sync" + "sync/atomic" "testing" "time" ) @@ -477,19 +478,79 @@ func benchmarkEndToEnd(dial func() (*Client, os.Error), b *testing.B) { // Synchronous calls args := &Args{7, 8} - reply := new(Reply) + procs := runtime.GOMAXPROCS(-1) + N := int32(b.N) + var wg sync.WaitGroup + wg.Add(procs) b.StartTimer() - for i := 0; i < b.N; i++ { - err = client.Call("Arith.Add", args, reply) - if err != nil { - fmt.Printf("Add: expected no error but got string %q", err.String()) - break - } - if reply.C != args.A+args.B { - fmt.Printf("Add: expected %d got %d", reply.C, args.A+args.B) - break - } + + for p := 0; p < procs; p++ { + go func() { + reply := new(Reply) + for atomic.AddInt32(&N, -1) >= 0 { + err = client.Call("Arith.Add", args, reply) + if err != nil { + fmt.Printf("Add: expected no error but got string %q", err.String()) + panic("rpc error") + } + if reply.C != args.A+args.B { + fmt.Printf("Add: expected %d got %d", reply.C, args.A+args.B) + panic("rpc error") + } + } + wg.Done() + }() } + wg.Wait() +} + +func benchmarkEndToEndAsync(dial func() (*Client, os.Error), b *testing.B) { + const MaxConcurrentCalls = 100 + b.StopTimer() + once.Do(startServer) + client, err := dial() + if err != nil { + fmt.Println("error dialing", err) + return + } + + // Asynchronous calls + args := &Args{7, 8} + procs := 4 * runtime.GOMAXPROCS(-1) + send := int32(b.N) + recv := int32(b.N) + var wg sync.WaitGroup + wg.Add(procs) + gate := make(chan bool, MaxConcurrentCalls) + res := make(chan *Call, MaxConcurrentCalls) + b.StartTimer() + + for p := 0; p < procs; p++ { + go func() { + for atomic.AddInt32(&send, -1) >= 0 { + gate <- true + reply := new(Reply) + client.Go("Arith.Add", args, reply, res) + } + }() + go func() { + for call := range res { + a := call.Args.(*Args).A + b := call.Args.(*Args).B + c := call.Reply.(*Reply).C + if a+b != c { + fmt.Printf("Add: expected %d got %d", a+b, c) + panic("incorrect reply") + } + <-gate + if atomic.AddInt32(&recv, -1) == 0 { + close(res) + } + } + wg.Done() + }() + } + wg.Wait() } func BenchmarkEndToEnd(b *testing.B) { @@ -499,3 +560,11 @@ func BenchmarkEndToEnd(b *testing.B) { func BenchmarkEndToEndHTTP(b *testing.B) { benchmarkEndToEnd(dialHTTP, b) } + +func BenchmarkEndToEndAsync(b *testing.B) { + benchmarkEndToEndAsync(dialDirect, b) +} + +func BenchmarkEndToEndAsyncHTTP(b *testing.B) { + benchmarkEndToEndAsync(dialHTTP, b) +}