mirror of
https://github.com/golang/go
synced 2024-11-24 01:10:12 -07:00
sync: internal dynamically sized lock-free queue for sync.Pool
This adds a dynamically sized, lock-free, single-producer, multi-consumer queue that will be used in the new Pool stealing implementation. It's built on top of the fixed-size queue added in the previous CL. For #22950, #22331. Change-Id: Ifc0ca3895bec7e7f9289ba9fb7dd0332bf96ba5a Reviewed-on: https://go-review.googlesource.com/c/go/+/166958 Run-TryBot: Austin Clements <austin@google.com> TryBot-Result: Gobot Gobot <gobot@golang.org> Reviewed-by: David Chase <drchase@google.com>
This commit is contained in:
parent
2b60567002
commit
57bb7be4b1
@ -34,3 +34,20 @@ func (d *poolDequeue) PopHead() (interface{}, bool) {
|
||||
func (d *poolDequeue) PopTail() (interface{}, bool) {
|
||||
return d.popTail()
|
||||
}
|
||||
|
||||
func NewPoolChain() PoolDequeue {
|
||||
return new(poolChain)
|
||||
}
|
||||
|
||||
func (c *poolChain) PushHead(val interface{}) bool {
|
||||
c.pushHead(val)
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *poolChain) PopHead() (interface{}, bool) {
|
||||
return c.popHead()
|
||||
}
|
||||
|
||||
func (c *poolChain) PopTail() (interface{}, bool) {
|
||||
return c.popTail()
|
||||
}
|
||||
|
@ -151,6 +151,14 @@ func TestPoolStress(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPoolDequeue(t *testing.T) {
|
||||
testPoolDequeue(t, NewPoolDequeue(16))
|
||||
}
|
||||
|
||||
func TestPoolChain(t *testing.T) {
|
||||
testPoolDequeue(t, NewPoolChain())
|
||||
}
|
||||
|
||||
func testPoolDequeue(t *testing.T, d PoolDequeue) {
|
||||
const P = 10
|
||||
// In long mode, do enough pushes to wrap around the 21-bit
|
||||
// indexes.
|
||||
@ -158,7 +166,6 @@ func TestPoolDequeue(t *testing.T) {
|
||||
if testing.Short() {
|
||||
N = 1e3
|
||||
}
|
||||
d := NewPoolDequeue(16)
|
||||
have := make([]int32, N)
|
||||
var stop int32
|
||||
var wg WaitGroup
|
||||
|
@ -52,10 +52,10 @@ const dequeueBits = 32
|
||||
|
||||
// dequeueLimit is the maximum size of a poolDequeue.
|
||||
//
|
||||
// This is half of 1<<dequeueBits because detecting fullness depends
|
||||
// on wrapping around the ring buffer without wrapping around the
|
||||
// index.
|
||||
const dequeueLimit = (1 << dequeueBits) / 2
|
||||
// This must be at most (1<<dequeueBits)/2 because detecting fullness
|
||||
// depends on wrapping around the ring buffer without wrapping around
|
||||
// the index. We divide by 4 so this fits in an int on 32-bit.
|
||||
const dequeueLimit = (1 << dequeueBits) / 4
|
||||
|
||||
// dequeueNil is used in poolDeqeue to represent interface{}(nil).
|
||||
// Since we use nil to represent empty slots, we need a sentinel value
|
||||
@ -183,3 +183,127 @@ func (d *poolDequeue) popTail() (interface{}, bool) {
|
||||
|
||||
return val, true
|
||||
}
|
||||
|
||||
// poolChain is a dynamically-sized version of poolDequeue.
|
||||
//
|
||||
// This is implemented as a doubly-linked list queue of poolDequeues
|
||||
// where each dequeue is double the size of the previous one. Once a
|
||||
// dequeue fills up, this allocates a new one and only ever pushes to
|
||||
// the latest dequeue. Pops happen from the other end of the list and
|
||||
// once a dequeue is exhausted, it gets removed from the list.
|
||||
type poolChain struct {
|
||||
// head is the poolDequeue to push to. This is only accessed
|
||||
// by the producer, so doesn't need to be synchronized.
|
||||
head *poolChainElt
|
||||
|
||||
// tail is the poolDequeue to popTail from. This is accessed
|
||||
// by consumers, so reads and writes must be atomic.
|
||||
tail *poolChainElt
|
||||
}
|
||||
|
||||
type poolChainElt struct {
|
||||
poolDequeue
|
||||
|
||||
// next and prev link to the adjacent poolChainElts in this
|
||||
// poolChain.
|
||||
//
|
||||
// next is written atomically by the producer and read
|
||||
// atomically by the consumer. It only transitions from nil to
|
||||
// non-nil.
|
||||
//
|
||||
// prev is written atomically by the consumer and read
|
||||
// atomically by the producer. It only transitions from
|
||||
// non-nil to nil.
|
||||
next, prev *poolChainElt
|
||||
}
|
||||
|
||||
func storePoolChainElt(pp **poolChainElt, v *poolChainElt) {
|
||||
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v))
|
||||
}
|
||||
|
||||
func loadPoolChainElt(pp **poolChainElt) *poolChainElt {
|
||||
return (*poolChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp))))
|
||||
}
|
||||
|
||||
func (c *poolChain) pushHead(val interface{}) {
|
||||
d := c.head
|
||||
if d == nil {
|
||||
// Initialize the chain.
|
||||
const initSize = 8 // Must be a power of 2
|
||||
d = new(poolChainElt)
|
||||
d.vals = make([]eface, initSize)
|
||||
c.head = d
|
||||
storePoolChainElt(&c.tail, d)
|
||||
}
|
||||
|
||||
if d.pushHead(val) {
|
||||
return
|
||||
}
|
||||
|
||||
// The current dequeue is full. Allocate a new one of twice
|
||||
// the size.
|
||||
newSize := len(d.vals) * 2
|
||||
if newSize >= dequeueLimit {
|
||||
// Can't make it any bigger.
|
||||
newSize = dequeueLimit
|
||||
}
|
||||
|
||||
d2 := &poolChainElt{prev: d}
|
||||
d2.vals = make([]eface, newSize)
|
||||
c.head = d2
|
||||
storePoolChainElt(&d.next, d2)
|
||||
d2.pushHead(val)
|
||||
}
|
||||
|
||||
func (c *poolChain) popHead() (interface{}, bool) {
|
||||
d := c.head
|
||||
for d != nil {
|
||||
if val, ok := d.popHead(); ok {
|
||||
return val, ok
|
||||
}
|
||||
// There may still be unconsumed elements in the
|
||||
// previous dequeue, so try backing up.
|
||||
d = loadPoolChainElt(&d.prev)
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (c *poolChain) popTail() (interface{}, bool) {
|
||||
d := loadPoolChainElt(&c.tail)
|
||||
if d == nil {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
for {
|
||||
// It's important that we load the next pointer
|
||||
// *before* popping the tail. In general, d may be
|
||||
// transiently empty, but if next is non-nil before
|
||||
// the pop and the pop fails, then d is permanently
|
||||
// empty, which is the only condition under which it's
|
||||
// safe to drop d from the chain.
|
||||
d2 := loadPoolChainElt(&d.next)
|
||||
|
||||
if val, ok := d.popTail(); ok {
|
||||
return val, ok
|
||||
}
|
||||
|
||||
if d2 == nil {
|
||||
// This is the only dequeue. It's empty right
|
||||
// now, but could be pushed to in the future.
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// The tail of the chain has been drained, so move on
|
||||
// to the next dequeue. Try to drop it from the chain
|
||||
// so the next pop doesn't have to look at the empty
|
||||
// dequeue again.
|
||||
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
|
||||
// We won the race. Clear the prev pointer so
|
||||
// the garbage collector can collect the empty
|
||||
// dequeue and so popHead doesn't back up
|
||||
// further than necessary.
|
||||
storePoolChainElt(&d2.prev, nil)
|
||||
}
|
||||
d = d2
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user