diff --git a/src/pkg/sync/Makefile b/src/pkg/sync/Makefile index fd8e5d9987e..7f57a2cfbc6 100644 --- a/src/pkg/sync/Makefile +++ b/src/pkg/sync/Makefile @@ -6,6 +6,7 @@ include ../../Make.inc TARG=sync GOFILES=\ + cond.go\ mutex.go\ once.go \ rwmutex.go\ diff --git a/src/pkg/sync/cond.go b/src/pkg/sync/cond.go new file mode 100644 index 00000000000..c99fda3cce9 --- /dev/null +++ b/src/pkg/sync/cond.go @@ -0,0 +1,89 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +package sync + +import "runtime" + +// Cond implements a condition variable, a rendezvous point +// for goroutines waiting for or announcing the occurrence +// of an event. +// +// Each Cond has an associated Locker L (often a *Mutex or *RWMutex), +// which must be held when changing the condition and +// when calling the Wait method. +type Cond struct { + L Locker // held while observing or changing the condition + m Mutex // held to avoid internal races + waiters int // number of goroutines blocked on Wait + sema *uint32 +} + +// NewCond returns a new Cond with Locker l. +func NewCond(l Locker) *Cond { + return &Cond{L: l} +} + +// Wait atomically unlocks c.L and suspends execution +// of the calling goroutine. After later resuming execution, +// Wait locks c.L before returning. +// +// Because L is not locked when Wait first resumes, the caller +// typically cannot assume that the condition is true when +// Wait returns. Instead, the caller should Wait in a loop: +// +// c.L.Lock() +// for !condition() { +// c.Wait() +// } +// ... make use of condition ... +// c.L.Unlock() +// +func (c *Cond) Wait() { + c.m.Lock() + if c.sema == nil { + c.sema = new(uint32) + } + s := c.sema + c.waiters++ + c.m.Unlock() + c.L.Unlock() + runtime.Semacquire(s) + c.L.Lock() +} + +// Signal wakes one goroutine waiting on c, if there is any. +// +// It is allowed but not required for the caller to hold c.L +// during the call. +func (c *Cond) Signal() { + c.m.Lock() + if c.waiters > 0 { + c.waiters-- + runtime.Semrelease(c.sema) + } + c.m.Unlock() +} + +// Broadcast wakes all goroutines waiting on c. +// +// It is allowed but not required for the caller to hold c.L +// during the call. +func (c *Cond) Broadcast() { + c.m.Lock() + if c.waiters > 0 { + s := c.sema + n := c.waiters + for i := 0; i < n; i++ { + runtime.Semrelease(s) + } + // We just issued n wakeups via the semaphore s. + // To ensure that they wake up the existing waiters + // and not waiters that arrive after Broadcast returns, + // clear c.sema. The next operation will allocate + // a new one. + c.sema = nil + c.waiters = 0 + } + c.m.Unlock() +} diff --git a/src/pkg/sync/cond_test.go b/src/pkg/sync/cond_test.go new file mode 100644 index 00000000000..2b99c91bf3b --- /dev/null +++ b/src/pkg/sync/cond_test.go @@ -0,0 +1,99 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +package sync_test + +import ( + . "sync" + "testing" +) + +func TestCondSignal(t *testing.T) { + var m Mutex + c := NewCond(&m) + n := 1000 + running := make(chan bool, n) + awake := make(chan bool, n) + for i := 0; i < n; i++ { + go func() { + m.Lock() + running <- true + c.Wait() + awake <- true + m.Unlock() + }() + } + for i := 0; i < n; i++ { + <-running // Wait for everyone to run. + } + for n > 0 { + select { + case <-awake: + t.Fatal("goroutine not asleep") + default: + } + m.Lock() + c.Signal() + m.Unlock() + <-awake // Will deadlock if no goroutine wakes up + select { + case <-awake: + t.Fatal("too many goroutines awake") + default: + } + n-- + } + c.Signal() +} + +func TestCondBroadcast(t *testing.T) { + var m Mutex + c := NewCond(&m) + n := 200 + running := make(chan int, n) + awake := make(chan int, n) + exit := false + for i := 0; i < n; i++ { + go func(g int) { + m.Lock() + for !exit { + running <- g + c.Wait() + awake <- g + } + m.Unlock() + }(i) + } + for i := 0; i < n; i++ { + for i := 0; i < n; i++ { + <-running // Will deadlock unless n are running. + } + if i == n-1 { + m.Lock() + exit = true + m.Unlock() + } + select { + case <-awake: + t.Fatal("goroutine not asleep") + default: + } + m.Lock() + c.Broadcast() + m.Unlock() + seen := make([]bool, n) + for i := 0; i < n; i++ { + g := <-awake + if seen[g] { + t.Fatal("goroutine woke up twice") + } + seen[g] = true + } + } + select { + case <-running: + t.Fatal("goroutine did not exit") + default: + } + c.Broadcast() +} diff --git a/src/pkg/sync/mutex.go b/src/pkg/sync/mutex.go index 2a1270b9c40..ff38691c89b 100644 --- a/src/pkg/sync/mutex.go +++ b/src/pkg/sync/mutex.go @@ -21,6 +21,12 @@ type Mutex struct { sema uint32 } +// A Locker represents an object that can be locked and unlocked. +type Locker interface { + Lock() + Unlock() +} + // Add delta to *val, and return the new *val in a thread-safe way. If multiple // goroutines call xadd on the same val concurrently, the changes will be // serialized, and all the deltas will be added in an undefined order. diff --git a/src/pkg/sync/rwmutex.go b/src/pkg/sync/rwmutex.go index 25696aca2fd..13f48a077ff 100644 --- a/src/pkg/sync/rwmutex.go +++ b/src/pkg/sync/rwmutex.go @@ -71,3 +71,14 @@ func (rw *RWMutex) Lock() { // goroutine. One goroutine may RLock (Lock) an RWMutex and then // arrange for another goroutine to RUnlock (Unlock) it. func (rw *RWMutex) Unlock() { rw.w.Unlock() } + +// RLocker returns a Locker interface that implements +// the Lock and Unlock methods by calling rw.RLock and rw.RUnlock. +func (rw *RWMutex) RLocker() Locker { + return (*rlocker)(rw) +} + +type rlocker RWMutex + +func (r *rlocker) Lock() { (*RWMutex)(r).RLock() } +func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() } diff --git a/src/pkg/sync/rwmutex_test.go b/src/pkg/sync/rwmutex_test.go index 111bca1e389..4f748b2191a 100644 --- a/src/pkg/sync/rwmutex_test.go +++ b/src/pkg/sync/rwmutex_test.go @@ -112,3 +112,38 @@ func TestRWMutex(t *testing.T) { HammerRWMutex(10, 10, 1000) HammerRWMutex(10, 5, 10000) } + +func TestRLocker(t *testing.T) { + var wl RWMutex + var rl Locker + wlocked := make(chan bool, 1) + rlocked := make(chan bool, 1) + rl = wl.RLocker() + n := 10 + go func() { + for i := 0; i < n; i++ { + rl.Lock() + rl.Lock() + rlocked <- true + wl.Lock() + wlocked <- true + } + }() + for i := 0; i < n; i++ { + <-rlocked + rl.Unlock() + select { + case <-wlocked: + t.Fatal("RLocker() didn't read-lock it") + default: + } + rl.Unlock() + <-wlocked + select { + case <-rlocked: + t.Fatal("RLocker() didn't respect the write lock") + default: + } + wl.Unlock() + } +}