diff --git a/src/sync/export_test.go b/src/sync/export_test.go index 0252b64f589..10d3599f476 100644 --- a/src/sync/export_test.go +++ b/src/sync/export_test.go @@ -34,3 +34,20 @@ func (d *poolDequeue) PopHead() (interface{}, bool) { func (d *poolDequeue) PopTail() (interface{}, bool) { return d.popTail() } + +func NewPoolChain() PoolDequeue { + return new(poolChain) +} + +func (c *poolChain) PushHead(val interface{}) bool { + c.pushHead(val) + return true +} + +func (c *poolChain) PopHead() (interface{}, bool) { + return c.popHead() +} + +func (c *poolChain) PopTail() (interface{}, bool) { + return c.popTail() +} diff --git a/src/sync/pool_test.go b/src/sync/pool_test.go index 6e9f9f3463c..62085b5c966 100644 --- a/src/sync/pool_test.go +++ b/src/sync/pool_test.go @@ -151,6 +151,14 @@ func TestPoolStress(t *testing.T) { } func TestPoolDequeue(t *testing.T) { + testPoolDequeue(t, NewPoolDequeue(16)) +} + +func TestPoolChain(t *testing.T) { + testPoolDequeue(t, NewPoolChain()) +} + +func testPoolDequeue(t *testing.T, d PoolDequeue) { const P = 10 // In long mode, do enough pushes to wrap around the 21-bit // indexes. @@ -158,7 +166,6 @@ func TestPoolDequeue(t *testing.T) { if testing.Short() { N = 1e3 } - d := NewPoolDequeue(16) have := make([]int32, N) var stop int32 var wg WaitGroup diff --git a/src/sync/poolqueue.go b/src/sync/poolqueue.go index bc2ab647ffb..22f74969d96 100644 --- a/src/sync/poolqueue.go +++ b/src/sync/poolqueue.go @@ -52,10 +52,10 @@ const dequeueBits = 32 // dequeueLimit is the maximum size of a poolDequeue. // -// This is half of 1<= dequeueLimit { + // Can't make it any bigger. + newSize = dequeueLimit + } + + d2 := &poolChainElt{prev: d} + d2.vals = make([]eface, newSize) + c.head = d2 + storePoolChainElt(&d.next, d2) + d2.pushHead(val) +} + +func (c *poolChain) popHead() (interface{}, bool) { + d := c.head + for d != nil { + if val, ok := d.popHead(); ok { + return val, ok + } + // There may still be unconsumed elements in the + // previous dequeue, so try backing up. + d = loadPoolChainElt(&d.prev) + } + return nil, false +} + +func (c *poolChain) popTail() (interface{}, bool) { + d := loadPoolChainElt(&c.tail) + if d == nil { + return nil, false + } + + for { + // It's important that we load the next pointer + // *before* popping the tail. In general, d may be + // transiently empty, but if next is non-nil before + // the pop and the pop fails, then d is permanently + // empty, which is the only condition under which it's + // safe to drop d from the chain. + d2 := loadPoolChainElt(&d.next) + + if val, ok := d.popTail(); ok { + return val, ok + } + + if d2 == nil { + // This is the only dequeue. It's empty right + // now, but could be pushed to in the future. + return nil, false + } + + // The tail of the chain has been drained, so move on + // to the next dequeue. Try to drop it from the chain + // so the next pop doesn't have to look at the empty + // dequeue again. + if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { + // We won the race. Clear the prev pointer so + // the garbage collector can collect the empty + // dequeue and so popHead doesn't back up + // further than necessary. + storePoolChainElt(&d2.prev, nil) + } + d = d2 + } +}