aboutsummaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
authorAustin Clements <austin@google.com>2019-03-01 13:16:37 -0500
committerAustin Clements <austin@google.com>2019-04-05 18:49:03 +0000
commit2b605670020fc637e20a609b1dc86c1f0e7afdd1 (patch)
treefa5ecc1c95a04fe0c1009fd6d1702aec631df295 /src/sync
parente47ced78578c471cbcd34a7d6b223a71e84a46c8 (diff)
downloadgo-2b605670020fc637e20a609b1dc86c1f0e7afdd1.tar.gz
go-2b605670020fc637e20a609b1dc86c1f0e7afdd1.zip
sync: internal fixed size lock-free queue for sync.Pool
This is the first step toward fixing multiple issues with sync.Pool. This adds a fixed size, lock-free, single-producer, multi-consumer queue that will be used in the new Pool stealing implementation. For #22950, #22331. Change-Id: I50e85e3cb83a2ee71f611ada88e7f55996504bb5 Reviewed-on: https://go-review.googlesource.com/c/go/+/166957 Run-TryBot: Austin Clements <austin@google.com> TryBot-Result: Gobot Gobot <gobot@golang.org> Reviewed-by: David Chase <drchase@google.com>
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/export_test.go25
-rw-r--r--src/sync/pool_test.go73
-rw-r--r--src/sync/poolqueue.go185
3 files changed, 283 insertions, 0 deletions
diff --git a/src/sync/export_test.go b/src/sync/export_test.go
index 669076efad..0252b64f58 100644
--- a/src/sync/export_test.go
+++ b/src/sync/export_test.go
@@ -9,3 +9,28 @@ var Runtime_Semacquire = runtime_Semacquire
var Runtime_Semrelease = runtime_Semrelease
var Runtime_procPin = runtime_procPin
var Runtime_procUnpin = runtime_procUnpin
+
+// poolDequeue testing.
+type PoolDequeue interface {
+ PushHead(val interface{}) bool
+ PopHead() (interface{}, bool)
+ PopTail() (interface{}, bool)
+}
+
+func NewPoolDequeue(n int) PoolDequeue {
+ return &poolDequeue{
+ vals: make([]eface, n),
+ }
+}
+
+func (d *poolDequeue) PushHead(val interface{}) bool {
+ return d.pushHead(val)
+}
+
+func (d *poolDequeue) PopHead() (interface{}, bool) {
+ return d.popHead()
+}
+
+func (d *poolDequeue) PopTail() (interface{}, bool) {
+ return d.popTail()
+}
diff --git a/src/sync/pool_test.go b/src/sync/pool_test.go
index 9e5132bb18..6e9f9f3463 100644
--- a/src/sync/pool_test.go
+++ b/src/sync/pool_test.go
@@ -150,6 +150,79 @@ func TestPoolStress(t *testing.T) {
}
}
+func TestPoolDequeue(t *testing.T) {
+ const P = 10
+ // In long mode, do enough pushes to wrap around the 21-bit
+ // indexes.
+ N := 1<<21 + 1000
+ if testing.Short() {
+ N = 1e3
+ }
+ d := NewPoolDequeue(16)
+ have := make([]int32, N)
+ var stop int32
+ var wg WaitGroup
+
+ // Start P-1 consumers.
+ for i := 1; i < P; i++ {
+ wg.Add(1)
+ go func() {
+ fail := 0
+ for atomic.LoadInt32(&stop) == 0 {
+ val, ok := d.PopTail()
+ if ok {
+ fail = 0
+ atomic.AddInt32(&have[val.(int)], 1)
+ if val.(int) == N-1 {
+ atomic.StoreInt32(&stop, 1)
+ }
+ } else {
+ // Speed up the test by
+ // allowing the pusher to run.
+ if fail++; fail%100 == 0 {
+ runtime.Gosched()
+ }
+ }
+ }
+ wg.Done()
+ }()
+ }
+
+ // Start 1 producer.
+ nPopHead := 0
+ wg.Add(1)
+ go func() {
+ for j := 0; j < N; j++ {
+ for !d.PushHead(j) {
+ // Allow a popper to run.
+ runtime.Gosched()
+ }
+ if j%10 == 0 {
+ val, ok := d.PopHead()
+ if ok {
+ nPopHead++
+ atomic.AddInt32(&have[val.(int)], 1)
+ }
+ }
+ }
+ wg.Done()
+ }()
+ wg.Wait()
+
+ // Check results.
+ for i, count := range have {
+ if count != 1 {
+ t.Errorf("expected have[%d] = 1, got %d", i, count)
+ }
+ }
+ if nPopHead == 0 {
+ // In theory it's possible in a valid schedule for
+ // popHead to never succeed, but in practice it almost
+ // always succeeds, so this is unlikely to flake.
+ t.Errorf("popHead never succeeded")
+ }
+}
+
func BenchmarkPool(b *testing.B) {
var p Pool
b.RunParallel(func(pb *testing.PB) {
diff --git a/src/sync/poolqueue.go b/src/sync/poolqueue.go
new file mode 100644
index 0000000000..bc2ab647ff
--- /dev/null
+++ b/src/sync/poolqueue.go
@@ -0,0 +1,185 @@
+// Copyright 2019 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package sync
+
+import (
+ "sync/atomic"
+ "unsafe"
+)
+
+// poolDequeue is a lock-free fixed-size single-producer,
+// multi-consumer queue. The single producer can both push and pop
+// from the head, and consumers can pop from the tail.
+//
+// It has the added feature that it nils out unused slots to avoid
+// unnecessary retention of objects. This is important for sync.Pool,
+// but not typically a property considered in the literature.
+type poolDequeue struct {
+ // headTail packs together a 32-bit head index and a 32-bit
+ // tail index. Both are indexes into vals modulo len(vals)-1.
+ //
+ // tail = index of oldest data in queue
+ // head = index of next slot to fill
+ //
+ // Slots in the range [tail, head) are owned by consumers.
+ // A consumer continues to own a slot outside this range until
+ // it nils the slot, at which point ownership passes to the
+ // producer.
+ //
+ // The head index is stored in the most-significant bits so
+ // that we can atomically add to it and the overflow is
+ // harmless.
+ headTail uint64
+
+ // vals is a ring buffer of interface{} values stored in this
+ // dequeue. The size of this must be a power of 2.
+ //
+ // vals[i].typ is nil if the slot is empty and non-nil
+ // otherwise. A slot is still in use until *both* the tail
+ // index has moved beyond it and typ has been set to nil. This
+ // is set to nil atomically by the consumer and read
+ // atomically by the producer.
+ vals []eface
+}
+
+type eface struct {
+ typ, val unsafe.Pointer
+}
+
+const dequeueBits = 32
+
+// dequeueLimit is the maximum size of a poolDequeue.
+//
+// This is half of 1<<dequeueBits because detecting fullness depends
+// on wrapping around the ring buffer without wrapping around the
+// index.
+const dequeueLimit = (1 << dequeueBits) / 2
+
+// dequeueNil is used in poolDeqeue to represent interface{}(nil).
+// Since we use nil to represent empty slots, we need a sentinel value
+// to represent nil.
+type dequeueNil *struct{}
+
+func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
+ const mask = 1<<dequeueBits - 1
+ head = uint32((ptrs >> dequeueBits) & mask)
+ tail = uint32(ptrs & mask)
+ return
+}
+
+func (d *poolDequeue) pack(head, tail uint32) uint64 {
+ const mask = 1<<dequeueBits - 1
+ return (uint64(head) << dequeueBits) |
+ uint64(tail&mask)
+}
+
+// pushHead adds val at the head of the queue. It returns false if the
+// queue is full. It must only be called by a single producer.
+func (d *poolDequeue) pushHead(val interface{}) bool {
+ ptrs := atomic.LoadUint64(&d.headTail)
+ head, tail := d.unpack(ptrs)
+ if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
+ // Queue is full.
+ return false
+ }
+ slot := &d.vals[head&uint32(len(d.vals)-1)]
+
+ // Check if the head slot has been released by popTail.
+ typ := atomic.LoadPointer(&slot.typ)
+ if typ != nil {
+ // Another goroutine is still cleaning up the tail, so
+ // the queue is actually still full.
+ return false
+ }
+
+ // The head slot is free, so we own it.
+ if val == nil {
+ val = dequeueNil(nil)
+ }
+ *(*interface{})(unsafe.Pointer(slot)) = val
+
+ // Increment head. This passes ownership of slot to popTail
+ // and acts as a store barrier for writing the slot.
+ atomic.AddUint64(&d.headTail, 1<<dequeueBits)
+ return true
+}
+
+// popHead removes and returns the element at the head of the queue.
+// It returns false if the queue is empty. It must only be called by a
+// single producer.
+func (d *poolDequeue) popHead() (interface{}, bool) {
+ var slot *eface
+ for {
+ ptrs := atomic.LoadUint64(&d.headTail)
+ head, tail := d.unpack(ptrs)
+ if tail == head {
+ // Queue is empty.
+ return nil, false
+ }
+
+ // Confirm tail and decrement head. We do this before
+ // reading the value to take back ownership of this
+ // slot.
+ head--
+ ptrs2 := d.pack(head, tail)
+ if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
+ // We successfully took back slot.
+ slot = &d.vals[head&uint32(len(d.vals)-1)]
+ break
+ }
+ }
+
+ val := *(*interface{})(unsafe.Pointer(slot))
+ if val == dequeueNil(nil) {
+ val = nil
+ }
+ // Zero the slot. Unlike popTail, this isn't racing with
+ // pushHead, so we don't need to be careful here.
+ *slot = eface{}
+ return val, true
+}
+
+// popTail removes and returns the element at the tail of the queue.
+// It returns false if the queue is empty. It may be called by any
+// number of consumers.
+func (d *poolDequeue) popTail() (interface{}, bool) {
+ var slot *eface
+ for {
+ ptrs := atomic.LoadUint64(&d.headTail)
+ head, tail := d.unpack(ptrs)
+ if tail == head {
+ // Queue is empty.
+ return nil, false
+ }
+
+ // Confirm head and tail (for our speculative check
+ // above) and increment tail. If this succeeds, then
+ // we own the slot at tail.
+ ptrs2 := d.pack(head, tail+1)
+ if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
+ // Success.
+ slot = &d.vals[tail&uint32(len(d.vals)-1)]
+ break
+ }
+ }
+
+ // We now own slot.
+ val := *(*interface{})(unsafe.Pointer(slot))
+ if val == dequeueNil(nil) {
+ val = nil
+ }
+
+ // Tell pushHead that we're done with this slot. Zeroing the
+ // slot is also important so we don't leave behind references
+ // that could keep this object live longer than necessary.
+ //
+ // We write to val first and then publish that we're done with
+ // this slot by atomically writing to typ.
+ slot.val = nil
+ atomic.StorePointer(&slot.typ, nil)
+ // At this point pushHead owns the slot.
+
+ return val, true
+}