aboutsummaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
authorAustin Clements <austin@google.com>2019-03-01 14:54:00 -0500
committerAustin Clements <austin@google.com>2019-04-05 18:49:04 +0000
commit57bb7be4b171a0c7ebf80467306c91e5ed8b2e84 (patch)
tree1921016a8d240a9fd61bf9e76ab2686ef138a814 /src/sync
parent2b605670020fc637e20a609b1dc86c1f0e7afdd1 (diff)
downloadgo-57bb7be4b171a0c7ebf80467306c91e5ed8b2e84.tar.gz
go-57bb7be4b171a0c7ebf80467306c91e5ed8b2e84.zip
sync: internal dynamically sized lock-free queue for sync.Pool
This adds a dynamically sized, lock-free, single-producer, multi-consumer queue that will be used in the new Pool stealing implementation. It's built on top of the fixed-size queue added in the previous CL. For #22950, #22331. Change-Id: Ifc0ca3895bec7e7f9289ba9fb7dd0332bf96ba5a Reviewed-on: https://go-review.googlesource.com/c/go/+/166958 Run-TryBot: Austin Clements <austin@google.com> TryBot-Result: Gobot Gobot <gobot@golang.org> Reviewed-by: David Chase <drchase@google.com>
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/export_test.go17
-rw-r--r--src/sync/pool_test.go9
-rw-r--r--src/sync/poolqueue.go132
3 files changed, 153 insertions, 5 deletions
diff --git a/src/sync/export_test.go b/src/sync/export_test.go
index 0252b64f58..10d3599f47 100644
--- a/src/sync/export_test.go
+++ b/src/sync/export_test.go
@@ -34,3 +34,20 @@ func (d *poolDequeue) PopHead() (interface{}, bool) {
func (d *poolDequeue) PopTail() (interface{}, bool) {
return d.popTail()
}
+
+func NewPoolChain() PoolDequeue {
+ return new(poolChain)
+}
+
+func (c *poolChain) PushHead(val interface{}) bool {
+ c.pushHead(val)
+ return true
+}
+
+func (c *poolChain) PopHead() (interface{}, bool) {
+ return c.popHead()
+}
+
+func (c *poolChain) PopTail() (interface{}, bool) {
+ return c.popTail()
+}
diff --git a/src/sync/pool_test.go b/src/sync/pool_test.go
index 6e9f9f3463..62085b5c96 100644
--- a/src/sync/pool_test.go
+++ b/src/sync/pool_test.go
@@ -151,6 +151,14 @@ func TestPoolStress(t *testing.T) {
}
func TestPoolDequeue(t *testing.T) {
+ testPoolDequeue(t, NewPoolDequeue(16))
+}
+
+func TestPoolChain(t *testing.T) {
+ testPoolDequeue(t, NewPoolChain())
+}
+
+func testPoolDequeue(t *testing.T, d PoolDequeue) {
const P = 10
// In long mode, do enough pushes to wrap around the 21-bit
// indexes.
@@ -158,7 +166,6 @@ func TestPoolDequeue(t *testing.T) {
if testing.Short() {
N = 1e3
}
- d := NewPoolDequeue(16)
have := make([]int32, N)
var stop int32
var wg WaitGroup
diff --git a/src/sync/poolqueue.go b/src/sync/poolqueue.go
index bc2ab647ff..22f74969d9 100644
--- a/src/sync/poolqueue.go
+++ b/src/sync/poolqueue.go
@@ -52,10 +52,10 @@ const dequeueBits = 32
// dequeueLimit is the maximum size of a poolDequeue.
//
-// This is half of 1<<dequeueBits because detecting fullness depends
-// on wrapping around the ring buffer without wrapping around the
-// index.
-const dequeueLimit = (1 << dequeueBits) / 2
+// This must be at most (1<<dequeueBits)/2 because detecting fullness
+// depends on wrapping around the ring buffer without wrapping around
+// the index. We divide by 4 so this fits in an int on 32-bit.
+const dequeueLimit = (1 << dequeueBits) / 4
// dequeueNil is used in poolDeqeue to represent interface{}(nil).
// Since we use nil to represent empty slots, we need a sentinel value
@@ -183,3 +183,127 @@ func (d *poolDequeue) popTail() (interface{}, bool) {
return val, true
}
+
+// poolChain is a dynamically-sized version of poolDequeue.
+//
+// This is implemented as a doubly-linked list queue of poolDequeues
+// where each dequeue is double the size of the previous one. Once a
+// dequeue fills up, this allocates a new one and only ever pushes to
+// the latest dequeue. Pops happen from the other end of the list and
+// once a dequeue is exhausted, it gets removed from the list.
+type poolChain struct {
+ // head is the poolDequeue to push to. This is only accessed
+ // by the producer, so doesn't need to be synchronized.
+ head *poolChainElt
+
+ // tail is the poolDequeue to popTail from. This is accessed
+ // by consumers, so reads and writes must be atomic.
+ tail *poolChainElt
+}
+
+type poolChainElt struct {
+ poolDequeue
+
+ // next and prev link to the adjacent poolChainElts in this
+ // poolChain.
+ //
+ // next is written atomically by the producer and read
+ // atomically by the consumer. It only transitions from nil to
+ // non-nil.
+ //
+ // prev is written atomically by the consumer and read
+ // atomically by the producer. It only transitions from
+ // non-nil to nil.
+ next, prev *poolChainElt
+}
+
+func storePoolChainElt(pp **poolChainElt, v *poolChainElt) {
+ atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v))
+}
+
+func loadPoolChainElt(pp **poolChainElt) *poolChainElt {
+ return (*poolChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp))))
+}
+
+func (c *poolChain) pushHead(val interface{}) {
+ d := c.head
+ if d == nil {
+ // Initialize the chain.
+ const initSize = 8 // Must be a power of 2
+ d = new(poolChainElt)
+ d.vals = make([]eface, initSize)
+ c.head = d
+ storePoolChainElt(&c.tail, d)
+ }
+
+ if d.pushHead(val) {
+ return
+ }
+
+ // The current dequeue is full. Allocate a new one of twice
+ // the size.
+ newSize := len(d.vals) * 2
+ if newSize >= dequeueLimit {
+ // Can't make it any bigger.
+ newSize = dequeueLimit
+ }
+
+ d2 := &poolChainElt{prev: d}
+ d2.vals = make([]eface, newSize)
+ c.head = d2
+ storePoolChainElt(&d.next, d2)
+ d2.pushHead(val)
+}
+
+func (c *poolChain) popHead() (interface{}, bool) {
+ d := c.head
+ for d != nil {
+ if val, ok := d.popHead(); ok {
+ return val, ok
+ }
+ // There may still be unconsumed elements in the
+ // previous dequeue, so try backing up.
+ d = loadPoolChainElt(&d.prev)
+ }
+ return nil, false
+}
+
+func (c *poolChain) popTail() (interface{}, bool) {
+ d := loadPoolChainElt(&c.tail)
+ if d == nil {
+ return nil, false
+ }
+
+ for {
+ // It's important that we load the next pointer
+ // *before* popping the tail. In general, d may be
+ // transiently empty, but if next is non-nil before
+ // the pop and the pop fails, then d is permanently
+ // empty, which is the only condition under which it's
+ // safe to drop d from the chain.
+ d2 := loadPoolChainElt(&d.next)
+
+ if val, ok := d.popTail(); ok {
+ return val, ok
+ }
+
+ if d2 == nil {
+ // This is the only dequeue. It's empty right
+ // now, but could be pushed to in the future.
+ return nil, false
+ }
+
+ // The tail of the chain has been drained, so move on
+ // to the next dequeue. Try to drop it from the chain
+ // so the next pop doesn't have to look at the empty
+ // dequeue again.
+ if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
+ // We won the race. Clear the prev pointer so
+ // the garbage collector can collect the empty
+ // dequeue and so popHead doesn't back up
+ // further than necessary.
+ storePoolChainElt(&d2.prev, nil)
+ }
+ d = d2
+ }
+}