mirror of
https://github.com/golang/go
synced 2024-11-24 09:30:07 -07:00
sync: always wake up previously sleeping goroutines on Cond.Signal
This changes the internal implementation of Cond so that it uses two generations of waiters. This enables Signal to guarantee that it will only wake up waiters that are currently sleeping at the call time. Fixes #1648. R=dvyukov, gustavo, rsc CC=golang-dev https://golang.org/cl/4524083
This commit is contained in:
parent
158b427ea5
commit
17bfa32fde
@ -14,10 +14,26 @@ import "runtime"
|
||||
// which must be held when changing the condition and
|
||||
// when calling the Wait method.
|
||||
type Cond struct {
|
||||
L Locker // held while observing or changing the condition
|
||||
m Mutex // held to avoid internal races
|
||||
waiters int // number of goroutines blocked on Wait
|
||||
sema *uint32
|
||||
L Locker // held while observing or changing the condition
|
||||
m Mutex // held to avoid internal races
|
||||
|
||||
// We must be careful to make sure that when Signal
|
||||
// releases a semaphore, the corresponding acquire is
|
||||
// executed by a goroutine that was already waiting at
|
||||
// the time of the call to Signal, not one that arrived later.
|
||||
// To ensure this, we segment waiting goroutines into
|
||||
// generations punctuated by calls to Signal. Each call to
|
||||
// Signal begins another generation if there are no goroutines
|
||||
// left in older generations for it to wake. Because of this
|
||||
// optimization (only begin another generation if there
|
||||
// are no older goroutines left), we only need to keep track
|
||||
// of the two most recent generations, which we call old
|
||||
// and new.
|
||||
oldWaiters int // number of waiters in old generation...
|
||||
oldSema *uint32 // ... waiting on this semaphore
|
||||
|
||||
newWaiters int // number of waiters in new generation...
|
||||
newSema *uint32 // ... waiting on this semaphore
|
||||
}
|
||||
|
||||
// NewCond returns a new Cond with Locker l.
|
||||
@ -42,11 +58,11 @@ func NewCond(l Locker) *Cond {
|
||||
//
|
||||
func (c *Cond) Wait() {
|
||||
c.m.Lock()
|
||||
if c.sema == nil {
|
||||
c.sema = new(uint32)
|
||||
if c.newSema == nil {
|
||||
c.newSema = new(uint32)
|
||||
}
|
||||
s := c.sema
|
||||
c.waiters++
|
||||
s := c.newSema
|
||||
c.newWaiters++
|
||||
c.m.Unlock()
|
||||
c.L.Unlock()
|
||||
runtime.Semacquire(s)
|
||||
@ -59,9 +75,16 @@ func (c *Cond) Wait() {
|
||||
// during the call.
|
||||
func (c *Cond) Signal() {
|
||||
c.m.Lock()
|
||||
if c.waiters > 0 {
|
||||
c.waiters--
|
||||
runtime.Semrelease(c.sema)
|
||||
if c.oldWaiters == 0 && c.newWaiters > 0 {
|
||||
// Retire old generation; rename new to old.
|
||||
c.oldWaiters = c.newWaiters
|
||||
c.oldSema = c.newSema
|
||||
c.newWaiters = 0
|
||||
c.newSema = nil
|
||||
}
|
||||
if c.oldWaiters > 0 {
|
||||
c.oldWaiters--
|
||||
runtime.Semrelease(c.oldSema)
|
||||
}
|
||||
c.m.Unlock()
|
||||
}
|
||||
@ -72,19 +95,19 @@ func (c *Cond) Signal() {
|
||||
// during the call.
|
||||
func (c *Cond) Broadcast() {
|
||||
c.m.Lock()
|
||||
if c.waiters > 0 {
|
||||
s := c.sema
|
||||
n := c.waiters
|
||||
for i := 0; i < n; i++ {
|
||||
runtime.Semrelease(s)
|
||||
// Wake both generations.
|
||||
if c.oldWaiters > 0 {
|
||||
for i := 0; i < c.oldWaiters; i++ {
|
||||
runtime.Semrelease(c.oldSema)
|
||||
}
|
||||
// We just issued n wakeups via the semaphore s.
|
||||
// To ensure that they wake up the existing waiters
|
||||
// and not waiters that arrive after Broadcast returns,
|
||||
// clear c.sema. The next operation will allocate
|
||||
// a new one.
|
||||
c.sema = nil
|
||||
c.waiters = 0
|
||||
c.oldWaiters = 0
|
||||
}
|
||||
if c.newWaiters > 0 {
|
||||
for i := 0; i < c.newWaiters; i++ {
|
||||
runtime.Semrelease(c.newSema)
|
||||
}
|
||||
c.newWaiters = 0
|
||||
c.newSema = nil
|
||||
}
|
||||
c.m.Unlock()
|
||||
}
|
||||
|
@ -46,6 +46,33 @@ func TestCondSignal(t *testing.T) {
|
||||
c.Signal()
|
||||
}
|
||||
|
||||
func TestCondSignalGenerations(t *testing.T) {
|
||||
var m Mutex
|
||||
c := NewCond(&m)
|
||||
n := 100
|
||||
running := make(chan bool, n)
|
||||
awake := make(chan int, n)
|
||||
for i := 0; i < n; i++ {
|
||||
go func(i int) {
|
||||
m.Lock()
|
||||
running <- true
|
||||
c.Wait()
|
||||
awake <- i
|
||||
m.Unlock()
|
||||
}(i)
|
||||
if i > 0 {
|
||||
a := <-awake
|
||||
if a != i-1 {
|
||||
t.Fatalf("wrong goroutine woke up: want %d, got %d", i-1, a)
|
||||
}
|
||||
}
|
||||
<-running
|
||||
m.Lock()
|
||||
c.Signal()
|
||||
m.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func TestCondBroadcast(t *testing.T) {
|
||||
var m Mutex
|
||||
c := NewCond(&m)
|
||||
|
Loading…
Reference in New Issue
Block a user