diff --git a/src/pkg/sync/rwmutex.go b/src/pkg/sync/rwmutex.go index 9248b4b037..cb1a47720b 100644 --- a/src/pkg/sync/rwmutex.go +++ b/src/pkg/sync/rwmutex.go @@ -4,7 +4,10 @@ package sync -import "sync/atomic" +import ( + "runtime" + "sync/atomic" +) // An RWMutex is a reader/writer mutual exclusion lock. // The lock can be held by an arbitrary number of readers @@ -12,35 +15,22 @@ import "sync/atomic" // RWMutexes can be created as part of other // structures; the zero value for a RWMutex is // an unlocked mutex. -// -// Writers take priority over Readers: no new RLocks -// are granted while a blocked Lock call is waiting. type RWMutex struct { - w Mutex // held if there are pending readers or writers - r Mutex // held if the w is being rd - readerCount int32 // number of pending readers + w Mutex // held if there are pending writers + writerSem uint32 // semaphore for writers to wait for completing readers + readerSem uint32 // semaphore for readers to wait for completing writers + readerCount int32 // number of pending readers + readerWait int32 // number of departing readers } +const rwmutexMaxReaders = 1 << 30 + // RLock locks rw for reading. -// If the lock is already locked for writing or there is a writer already waiting -// to release the lock, RLock blocks until the writer has released the lock. func (rw *RWMutex) RLock() { - // Use rw.r.Lock() to block granting the RLock if a goroutine - // is waiting for its Lock. This is the prevent starvation of W in - // this situation: - // A: rw.RLock() // granted - // W: rw.Lock() // waiting for rw.w().Lock() - // B: rw.RLock() // granted - // C: rw.RLock() // granted - // B: rw.RUnlock() - // ... (new readers come and go indefinitely, W is starving) - rw.r.Lock() - if atomic.AddInt32(&rw.readerCount, 1) == 1 { - // The first reader locks rw.w, so writers will be blocked - // while the readers have the RLock. - rw.w.Lock() + if atomic.AddInt32(&rw.readerCount, 1) < 0 { + // A writer is pending, wait for it. + runtime.Semacquire(&rw.readerSem) } - rw.r.Unlock() } // RUnlock undoes a single RLock call; @@ -48,9 +38,12 @@ func (rw *RWMutex) RLock() { // It is a run-time error if rw is not locked for reading // on entry to RUnlock. func (rw *RWMutex) RUnlock() { - if atomic.AddInt32(&rw.readerCount, -1) == 0 { - // last reader finished, enable writers - rw.w.Unlock() + if atomic.AddInt32(&rw.readerCount, -1) < 0 { + // A writer is pending. + if atomic.AddInt32(&rw.readerWait, -1) == 0 { + // The last reader unblocks the writer. + runtime.Semrelease(&rw.writerSem) + } } } @@ -61,9 +54,14 @@ func (rw *RWMutex) RUnlock() { // a blocked Lock call excludes new readers from acquiring // the lock. func (rw *RWMutex) Lock() { - rw.r.Lock() + // First, resolve competition with other writers. rw.w.Lock() - rw.r.Unlock() + // Announce to readers there is a pending writer. + r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders + // Wait for active readers. + if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { + runtime.Semacquire(&rw.writerSem) + } } // Unlock unlocks rw for writing. It is a run-time error if rw is @@ -72,7 +70,16 @@ func (rw *RWMutex) Lock() { // As with Mutexes, a locked RWMutex is not associated with a particular // goroutine. One goroutine may RLock (Lock) an RWMutex and then // arrange for another goroutine to RUnlock (Unlock) it. -func (rw *RWMutex) Unlock() { rw.w.Unlock() } +func (rw *RWMutex) Unlock() { + // Announce to readers there is no active writer. + r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) + // Unblock blocked readers, if any. + for i := 0; i < int(r); i++ { + runtime.Semrelease(&rw.readerSem) + } + // Allow other writers to proceed. + rw.w.Unlock() +} // RLocker returns a Locker interface that implements // the Lock and Unlock methods by calling rw.RLock and rw.RUnlock. diff --git a/src/pkg/sync/rwmutex_test.go b/src/pkg/sync/rwmutex_test.go index 0480a66018..dc8ce9653c 100644 --- a/src/pkg/sync/rwmutex_test.go +++ b/src/pkg/sync/rwmutex_test.go @@ -154,3 +154,84 @@ func TestRLocker(t *testing.T) { wl.Unlock() } } + +func BenchmarkRWMutexUncontended(b *testing.B) { + type PaddedRWMutex struct { + RWMutex + pad [32]uint32 + } + const CallsPerSched = 1000 + procs := runtime.GOMAXPROCS(-1) + N := int32(b.N / CallsPerSched) + c := make(chan bool, procs) + for p := 0; p < procs; p++ { + go func() { + var rwm PaddedRWMutex + for atomic.AddInt32(&N, -1) >= 0 { + runtime.Gosched() + for g := 0; g < CallsPerSched; g++ { + rwm.RLock() + rwm.RLock() + rwm.RUnlock() + rwm.RUnlock() + rwm.Lock() + rwm.Unlock() + } + } + c <- true + }() + } + for p := 0; p < procs; p++ { + <-c + } +} + +func benchmarkRWMutex(b *testing.B, localWork, writeRatio int) { + const CallsPerSched = 1000 + procs := runtime.GOMAXPROCS(-1) + N := int32(b.N / CallsPerSched) + c := make(chan bool, procs) + var rwm RWMutex + for p := 0; p < procs; p++ { + go func() { + foo := 0 + for atomic.AddInt32(&N, -1) >= 0 { + runtime.Gosched() + for g := 0; g < CallsPerSched; g++ { + foo++ + if foo%writeRatio == 0 { + rwm.Lock() + rwm.Unlock() + } else { + rwm.RLock() + for i := 0; i != localWork; i += 1 { + foo *= 2 + foo /= 2 + } + rwm.RUnlock() + } + } + } + c <- foo == 42 + }() + } + for p := 0; p < procs; p++ { + <-c + } +} + +func BenchmarkRWMutexWrite100(b *testing.B) { + benchmarkRWMutex(b, 0, 100) +} + +func BenchmarkRWMutexWrite10(b *testing.B) { + benchmarkRWMutex(b, 0, 10) +} + +func BenchmarkRWMutexWorkWrite100(b *testing.B) { + benchmarkRWMutex(b, 100, 100) +} + +func BenchmarkRWMutexWorkWrite10(b *testing.B) { + benchmarkRWMutex(b, 100, 10) +}