mirror of
https://github.com/golang/go
synced 2024-11-22 00:24:41 -07:00
sync: Proposal for barrier implementation
As discussed in the mailing list, this adds a simple barrier implementation to the sync package which enables one or more goroutines to wait for a counter to go down to zero. R=rsc, rog, r CC=golang-dev https://golang.org/cl/3770045
This commit is contained in:
parent
838b5ad9d6
commit
63457d089e
@ -9,6 +9,7 @@ GOFILES=\
|
||||
mutex.go\
|
||||
once.go \
|
||||
rwmutex.go\
|
||||
waitgroup.go\
|
||||
|
||||
# 386-specific object files
|
||||
OFILES_386=\
|
||||
|
@ -3,10 +3,10 @@
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// The sync package provides basic synchronization primitives
|
||||
// such as mutual exclusion locks. Other than the Once type,
|
||||
// most are intended for use by low-level library routines.
|
||||
// Higher-level synchronization is better done via channels
|
||||
// and communication.
|
||||
// such as mutual exclusion locks. Other than the Once and
|
||||
// WaitGroup types, most are intended for use by low-level
|
||||
// library routines. Higher-level synchronization is better
|
||||
// done via channels and communication.
|
||||
package sync
|
||||
|
||||
import "runtime"
|
||||
|
86
src/pkg/sync/waitgroup.go
Normal file
86
src/pkg/sync/waitgroup.go
Normal file
@ -0,0 +1,86 @@
|
||||
// Copyright 2011 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package sync
|
||||
|
||||
import "runtime"
|
||||
|
||||
// A WaitGroup waits for a collection of goroutines to finish.
|
||||
// The main goroutine calls Add to set the number of
|
||||
// goroutines to wait for. Then each of the goroutines
|
||||
// runs and calls Done when finished. At the same time,
|
||||
// Wait can be used to block until all goroutines have finished.
|
||||
//
|
||||
// For example:
|
||||
//
|
||||
// for i := 0; i < n; i++ {
|
||||
// if !condition(i) {
|
||||
// continue
|
||||
// }
|
||||
// wg.Add(1)
|
||||
// go func() {
|
||||
// // Do something.
|
||||
// wg.Done()
|
||||
// }
|
||||
// }
|
||||
// wg.Wait()
|
||||
//
|
||||
type WaitGroup struct {
|
||||
m Mutex
|
||||
counter int
|
||||
waiters int
|
||||
sema *uint32
|
||||
}
|
||||
|
||||
// WaitGroup creates a new semaphore each time the old semaphore
|
||||
// is released. This is to avoid the following race:
|
||||
//
|
||||
// G1: Add(1)
|
||||
// G1: go G2()
|
||||
// G1: Wait() // Context switch after Unlock() and before Semacquire().
|
||||
// G2: Done() // Release semaphore: sema == 1, waiters == 0. G1 doesn't run yet.
|
||||
// G3: Wait() // Finds counter == 0, waiters == 0, doesn't block.
|
||||
// G3: Add(1) // Makes counter == 1, waiters == 0.
|
||||
// G3: go G4()
|
||||
// G3: Wait() // G1 still hasn't run, G3 finds sema == 1, unblocked! Bug.
|
||||
|
||||
// Add adds delta, which may be negative, to the WaitGroup counter.
|
||||
// If the counter becomes zero, all goroutines blocked on Wait() are released.
|
||||
func (wg *WaitGroup) Add(delta int) {
|
||||
wg.m.Lock()
|
||||
if delta < -wg.counter {
|
||||
wg.m.Unlock()
|
||||
panic("sync: negative WaitGroup count")
|
||||
}
|
||||
wg.counter += delta
|
||||
if wg.counter == 0 && wg.waiters > 0 {
|
||||
for i := 0; i < wg.waiters; i++ {
|
||||
runtime.Semrelease(wg.sema)
|
||||
}
|
||||
wg.waiters = 0
|
||||
wg.sema = nil
|
||||
}
|
||||
wg.m.Unlock()
|
||||
}
|
||||
|
||||
// Done decrements the WaitGroup counter.
|
||||
func (wg *WaitGroup) Done() {
|
||||
wg.Add(-1)
|
||||
}
|
||||
|
||||
// Wait blocks until the WaitGroup counter is zero.
|
||||
func (wg *WaitGroup) Wait() {
|
||||
wg.m.Lock()
|
||||
if wg.counter == 0 {
|
||||
wg.m.Unlock()
|
||||
return
|
||||
}
|
||||
wg.waiters++
|
||||
if wg.sema == nil {
|
||||
wg.sema = new(uint32)
|
||||
}
|
||||
s := wg.sema
|
||||
wg.m.Unlock()
|
||||
runtime.Semacquire(s)
|
||||
}
|
60
src/pkg/sync/waitgroup_test.go
Normal file
60
src/pkg/sync/waitgroup_test.go
Normal file
@ -0,0 +1,60 @@
|
||||
// Copyright 2011 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package sync_test
|
||||
|
||||
import (
|
||||
. "sync"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func testWaitGroup(t *testing.T, wg1 *WaitGroup, wg2 *WaitGroup) {
|
||||
n := 16
|
||||
wg1.Add(n)
|
||||
wg2.Add(n)
|
||||
exited := make(chan bool, n)
|
||||
for i := 0; i != n; i++ {
|
||||
go func(i int) {
|
||||
wg1.Done()
|
||||
wg2.Wait()
|
||||
exited <- true
|
||||
}(i)
|
||||
}
|
||||
wg1.Wait()
|
||||
for i := 0; i != n; i++ {
|
||||
select {
|
||||
case <-exited:
|
||||
t.Fatal("WaitGroup released group too soon")
|
||||
default:
|
||||
}
|
||||
wg2.Done()
|
||||
}
|
||||
for i := 0; i != n; i++ {
|
||||
<-exited // Will block if barrier fails to unlock someone.
|
||||
}
|
||||
}
|
||||
|
||||
func TestWaitGroup(t *testing.T) {
|
||||
wg1 := &WaitGroup{}
|
||||
wg2 := &WaitGroup{}
|
||||
|
||||
// Run the same test a few times to ensure barrier is in a proper state.
|
||||
for i := 0; i != 8; i++ {
|
||||
testWaitGroup(t, wg1, wg2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWaitGroupMisuse(t *testing.T) {
|
||||
defer func() {
|
||||
err := recover()
|
||||
if err != "sync: negative WaitGroup count" {
|
||||
t.Fatalf("Unexpected panic: %#v", err)
|
||||
}
|
||||
}()
|
||||
wg := &WaitGroup{}
|
||||
wg.Add(1)
|
||||
wg.Done()
|
||||
wg.Done()
|
||||
t.Fatal("Should panic")
|
||||
}
|
Loading…
Reference in New Issue
Block a user