diff options
author | Austin Clements <austin@google.com> | 2019-03-01 14:54:00 -0500 |
---|---|---|
committer | Austin Clements <austin@google.com> | 2019-04-05 18:49:04 +0000 |
commit | 57bb7be4b171a0c7ebf80467306c91e5ed8b2e84 (patch) | |
tree | 1921016a8d240a9fd61bf9e76ab2686ef138a814 /src/sync | |
parent | 2b605670020fc637e20a609b1dc86c1f0e7afdd1 (diff) | |
download | go-57bb7be4b171a0c7ebf80467306c91e5ed8b2e84.tar.gz go-57bb7be4b171a0c7ebf80467306c91e5ed8b2e84.zip |
sync: internal dynamically sized lock-free queue for sync.Pool
This adds a dynamically sized, lock-free, single-producer,
multi-consumer queue that will be used in the new Pool stealing
implementation. It's built on top of the fixed-size queue added in the
previous CL.
For #22950, #22331.
Change-Id: Ifc0ca3895bec7e7f9289ba9fb7dd0332bf96ba5a
Reviewed-on: https://go-review.googlesource.com/c/go/+/166958
Run-TryBot: Austin Clements <austin@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: David Chase <drchase@google.com>
Diffstat (limited to 'src/sync')
-rw-r--r-- | src/sync/export_test.go | 17 | ||||
-rw-r--r-- | src/sync/pool_test.go | 9 | ||||
-rw-r--r-- | src/sync/poolqueue.go | 132 |
3 files changed, 153 insertions, 5 deletions
diff --git a/src/sync/export_test.go b/src/sync/export_test.go index 0252b64f58..10d3599f47 100644 --- a/src/sync/export_test.go +++ b/src/sync/export_test.go @@ -34,3 +34,20 @@ func (d *poolDequeue) PopHead() (interface{}, bool) { func (d *poolDequeue) PopTail() (interface{}, bool) { return d.popTail() } + +func NewPoolChain() PoolDequeue { + return new(poolChain) +} + +func (c *poolChain) PushHead(val interface{}) bool { + c.pushHead(val) + return true +} + +func (c *poolChain) PopHead() (interface{}, bool) { + return c.popHead() +} + +func (c *poolChain) PopTail() (interface{}, bool) { + return c.popTail() +} diff --git a/src/sync/pool_test.go b/src/sync/pool_test.go index 6e9f9f3463..62085b5c96 100644 --- a/src/sync/pool_test.go +++ b/src/sync/pool_test.go @@ -151,6 +151,14 @@ func TestPoolStress(t *testing.T) { } func TestPoolDequeue(t *testing.T) { + testPoolDequeue(t, NewPoolDequeue(16)) +} + +func TestPoolChain(t *testing.T) { + testPoolDequeue(t, NewPoolChain()) +} + +func testPoolDequeue(t *testing.T, d PoolDequeue) { const P = 10 // In long mode, do enough pushes to wrap around the 21-bit // indexes. @@ -158,7 +166,6 @@ func TestPoolDequeue(t *testing.T) { if testing.Short() { N = 1e3 } - d := NewPoolDequeue(16) have := make([]int32, N) var stop int32 var wg WaitGroup diff --git a/src/sync/poolqueue.go b/src/sync/poolqueue.go index bc2ab647ff..22f74969d9 100644 --- a/src/sync/poolqueue.go +++ b/src/sync/poolqueue.go @@ -52,10 +52,10 @@ const dequeueBits = 32 // dequeueLimit is the maximum size of a poolDequeue. // -// This is half of 1<<dequeueBits because detecting fullness depends -// on wrapping around the ring buffer without wrapping around the -// index. -const dequeueLimit = (1 << dequeueBits) / 2 +// This must be at most (1<<dequeueBits)/2 because detecting fullness +// depends on wrapping around the ring buffer without wrapping around +// the index. We divide by 4 so this fits in an int on 32-bit. +const dequeueLimit = (1 << dequeueBits) / 4 // dequeueNil is used in poolDeqeue to represent interface{}(nil). // Since we use nil to represent empty slots, we need a sentinel value @@ -183,3 +183,127 @@ func (d *poolDequeue) popTail() (interface{}, bool) { return val, true } + +// poolChain is a dynamically-sized version of poolDequeue. +// +// This is implemented as a doubly-linked list queue of poolDequeues +// where each dequeue is double the size of the previous one. Once a +// dequeue fills up, this allocates a new one and only ever pushes to +// the latest dequeue. Pops happen from the other end of the list and +// once a dequeue is exhausted, it gets removed from the list. +type poolChain struct { + // head is the poolDequeue to push to. This is only accessed + // by the producer, so doesn't need to be synchronized. + head *poolChainElt + + // tail is the poolDequeue to popTail from. This is accessed + // by consumers, so reads and writes must be atomic. + tail *poolChainElt +} + +type poolChainElt struct { + poolDequeue + + // next and prev link to the adjacent poolChainElts in this + // poolChain. + // + // next is written atomically by the producer and read + // atomically by the consumer. It only transitions from nil to + // non-nil. + // + // prev is written atomically by the consumer and read + // atomically by the producer. It only transitions from + // non-nil to nil. + next, prev *poolChainElt +} + +func storePoolChainElt(pp **poolChainElt, v *poolChainElt) { + atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v)) +} + +func loadPoolChainElt(pp **poolChainElt) *poolChainElt { + return (*poolChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp)))) +} + +func (c *poolChain) pushHead(val interface{}) { + d := c.head + if d == nil { + // Initialize the chain. + const initSize = 8 // Must be a power of 2 + d = new(poolChainElt) + d.vals = make([]eface, initSize) + c.head = d + storePoolChainElt(&c.tail, d) + } + + if d.pushHead(val) { + return + } + + // The current dequeue is full. Allocate a new one of twice + // the size. + newSize := len(d.vals) * 2 + if newSize >= dequeueLimit { + // Can't make it any bigger. + newSize = dequeueLimit + } + + d2 := &poolChainElt{prev: d} + d2.vals = make([]eface, newSize) + c.head = d2 + storePoolChainElt(&d.next, d2) + d2.pushHead(val) +} + +func (c *poolChain) popHead() (interface{}, bool) { + d := c.head + for d != nil { + if val, ok := d.popHead(); ok { + return val, ok + } + // There may still be unconsumed elements in the + // previous dequeue, so try backing up. + d = loadPoolChainElt(&d.prev) + } + return nil, false +} + +func (c *poolChain) popTail() (interface{}, bool) { + d := loadPoolChainElt(&c.tail) + if d == nil { + return nil, false + } + + for { + // It's important that we load the next pointer + // *before* popping the tail. In general, d may be + // transiently empty, but if next is non-nil before + // the pop and the pop fails, then d is permanently + // empty, which is the only condition under which it's + // safe to drop d from the chain. + d2 := loadPoolChainElt(&d.next) + + if val, ok := d.popTail(); ok { + return val, ok + } + + if d2 == nil { + // This is the only dequeue. It's empty right + // now, but could be pushed to in the future. + return nil, false + } + + // The tail of the chain has been drained, so move on + // to the next dequeue. Try to drop it from the chain + // so the next pop doesn't have to look at the empty + // dequeue again. + if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { + // We won the race. Clear the prev pointer so + // the garbage collector can collect the empty + // dequeue and so popHead doesn't back up + // further than necessary. + storePoolChainElt(&d2.prev, nil) + } + d = d2 + } +} |