From 17bfa32fde692b3d299ced5e1879b4f6bfefbfac Mon Sep 17 00:00:00 2001 From: Gustavo Niemeyer Date: Wed, 1 Jun 2011 20:30:42 -0300 Subject: [PATCH] sync: always wake up previously sleeping goroutines on Cond.Signal This changes the internal implementation of Cond so that it uses two generations of waiters. This enables Signal to guarantee that it will only wake up waiters that are currently sleeping at the call time. Fixes #1648. R=dvyukov, gustavo, rsc CC=golang-dev https://golang.org/cl/4524083 --- src/pkg/sync/cond.go | 69 ++++++++++++++++++++++++++------------- src/pkg/sync/cond_test.go | 27 +++++++++++++++ 2 files changed, 73 insertions(+), 23 deletions(-) diff --git a/src/pkg/sync/cond.go b/src/pkg/sync/cond.go index ea48f2e7a9..75494b5353 100644 --- a/src/pkg/sync/cond.go +++ b/src/pkg/sync/cond.go @@ -14,10 +14,26 @@ import "runtime" // which must be held when changing the condition and // when calling the Wait method. type Cond struct { - L Locker // held while observing or changing the condition - m Mutex // held to avoid internal races - waiters int // number of goroutines blocked on Wait - sema *uint32 + L Locker // held while observing or changing the condition + m Mutex // held to avoid internal races + + // We must be careful to make sure that when Signal + // releases a semaphore, the corresponding acquire is + // executed by a goroutine that was already waiting at + // the time of the call to Signal, not one that arrived later. + // To ensure this, we segment waiting goroutines into + // generations punctuated by calls to Signal. Each call to + // Signal begins another generation if there are no goroutines + // left in older generations for it to wake. Because of this + // optimization (only begin another generation if there + // are no older goroutines left), we only need to keep track + // of the two most recent generations, which we call old + // and new. + oldWaiters int // number of waiters in old generation... + oldSema *uint32 // ... waiting on this semaphore + + newWaiters int // number of waiters in new generation... + newSema *uint32 // ... waiting on this semaphore } // NewCond returns a new Cond with Locker l. @@ -42,11 +58,11 @@ func NewCond(l Locker) *Cond { // func (c *Cond) Wait() { c.m.Lock() - if c.sema == nil { - c.sema = new(uint32) + if c.newSema == nil { + c.newSema = new(uint32) } - s := c.sema - c.waiters++ + s := c.newSema + c.newWaiters++ c.m.Unlock() c.L.Unlock() runtime.Semacquire(s) @@ -59,9 +75,16 @@ func (c *Cond) Wait() { // during the call. func (c *Cond) Signal() { c.m.Lock() - if c.waiters > 0 { - c.waiters-- - runtime.Semrelease(c.sema) + if c.oldWaiters == 0 && c.newWaiters > 0 { + // Retire old generation; rename new to old. + c.oldWaiters = c.newWaiters + c.oldSema = c.newSema + c.newWaiters = 0 + c.newSema = nil + } + if c.oldWaiters > 0 { + c.oldWaiters-- + runtime.Semrelease(c.oldSema) } c.m.Unlock() } @@ -72,19 +95,19 @@ func (c *Cond) Signal() { // during the call. func (c *Cond) Broadcast() { c.m.Lock() - if c.waiters > 0 { - s := c.sema - n := c.waiters - for i := 0; i < n; i++ { - runtime.Semrelease(s) + // Wake both generations. + if c.oldWaiters > 0 { + for i := 0; i < c.oldWaiters; i++ { + runtime.Semrelease(c.oldSema) } - // We just issued n wakeups via the semaphore s. - // To ensure that they wake up the existing waiters - // and not waiters that arrive after Broadcast returns, - // clear c.sema. The next operation will allocate - // a new one. - c.sema = nil - c.waiters = 0 + c.oldWaiters = 0 + } + if c.newWaiters > 0 { + for i := 0; i < c.newWaiters; i++ { + runtime.Semrelease(c.newSema) + } + c.newWaiters = 0 + c.newSema = nil } c.m.Unlock() } diff --git a/src/pkg/sync/cond_test.go b/src/pkg/sync/cond_test.go index 846f98bf39..cefacb184e 100644 --- a/src/pkg/sync/cond_test.go +++ b/src/pkg/sync/cond_test.go @@ -46,6 +46,33 @@ func TestCondSignal(t *testing.T) { c.Signal() } +func TestCondSignalGenerations(t *testing.T) { + var m Mutex + c := NewCond(&m) + n := 100 + running := make(chan bool, n) + awake := make(chan int, n) + for i := 0; i < n; i++ { + go func(i int) { + m.Lock() + running <- true + c.Wait() + awake <- i + m.Unlock() + }(i) + if i > 0 { + a := <-awake + if a != i-1 { + t.Fatalf("wrong goroutine woke up: want %d, got %d", i-1, a) + } + } + <-running + m.Lock() + c.Signal() + m.Unlock() + } +} + func TestCondBroadcast(t *testing.T) { var m Mutex c := NewCond(&m)