diff options
author | Austin Clements <austin@google.com> | 2019-03-01 13:16:37 -0500 |
---|---|---|
committer | Austin Clements <austin@google.com> | 2019-04-05 18:49:03 +0000 |
commit | 2b605670020fc637e20a609b1dc86c1f0e7afdd1 (patch) | |
tree | fa5ecc1c95a04fe0c1009fd6d1702aec631df295 /src/sync | |
parent | e47ced78578c471cbcd34a7d6b223a71e84a46c8 (diff) | |
download | go-2b605670020fc637e20a609b1dc86c1f0e7afdd1.tar.gz go-2b605670020fc637e20a609b1dc86c1f0e7afdd1.zip |
sync: internal fixed size lock-free queue for sync.Pool
This is the first step toward fixing multiple issues with sync.Pool.
This adds a fixed size, lock-free, single-producer, multi-consumer
queue that will be used in the new Pool stealing implementation.
For #22950, #22331.
Change-Id: I50e85e3cb83a2ee71f611ada88e7f55996504bb5
Reviewed-on: https://go-review.googlesource.com/c/go/+/166957
Run-TryBot: Austin Clements <austin@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: David Chase <drchase@google.com>
Diffstat (limited to 'src/sync')
-rw-r--r-- | src/sync/export_test.go | 25 | ||||
-rw-r--r-- | src/sync/pool_test.go | 73 | ||||
-rw-r--r-- | src/sync/poolqueue.go | 185 |
3 files changed, 283 insertions, 0 deletions
diff --git a/src/sync/export_test.go b/src/sync/export_test.go index 669076efad..0252b64f58 100644 --- a/src/sync/export_test.go +++ b/src/sync/export_test.go @@ -9,3 +9,28 @@ var Runtime_Semacquire = runtime_Semacquire var Runtime_Semrelease = runtime_Semrelease var Runtime_procPin = runtime_procPin var Runtime_procUnpin = runtime_procUnpin + +// poolDequeue testing. +type PoolDequeue interface { + PushHead(val interface{}) bool + PopHead() (interface{}, bool) + PopTail() (interface{}, bool) +} + +func NewPoolDequeue(n int) PoolDequeue { + return &poolDequeue{ + vals: make([]eface, n), + } +} + +func (d *poolDequeue) PushHead(val interface{}) bool { + return d.pushHead(val) +} + +func (d *poolDequeue) PopHead() (interface{}, bool) { + return d.popHead() +} + +func (d *poolDequeue) PopTail() (interface{}, bool) { + return d.popTail() +} diff --git a/src/sync/pool_test.go b/src/sync/pool_test.go index 9e5132bb18..6e9f9f3463 100644 --- a/src/sync/pool_test.go +++ b/src/sync/pool_test.go @@ -150,6 +150,79 @@ func TestPoolStress(t *testing.T) { } } +func TestPoolDequeue(t *testing.T) { + const P = 10 + // In long mode, do enough pushes to wrap around the 21-bit + // indexes. + N := 1<<21 + 1000 + if testing.Short() { + N = 1e3 + } + d := NewPoolDequeue(16) + have := make([]int32, N) + var stop int32 + var wg WaitGroup + + // Start P-1 consumers. + for i := 1; i < P; i++ { + wg.Add(1) + go func() { + fail := 0 + for atomic.LoadInt32(&stop) == 0 { + val, ok := d.PopTail() + if ok { + fail = 0 + atomic.AddInt32(&have[val.(int)], 1) + if val.(int) == N-1 { + atomic.StoreInt32(&stop, 1) + } + } else { + // Speed up the test by + // allowing the pusher to run. + if fail++; fail%100 == 0 { + runtime.Gosched() + } + } + } + wg.Done() + }() + } + + // Start 1 producer. + nPopHead := 0 + wg.Add(1) + go func() { + for j := 0; j < N; j++ { + for !d.PushHead(j) { + // Allow a popper to run. + runtime.Gosched() + } + if j%10 == 0 { + val, ok := d.PopHead() + if ok { + nPopHead++ + atomic.AddInt32(&have[val.(int)], 1) + } + } + } + wg.Done() + }() + wg.Wait() + + // Check results. + for i, count := range have { + if count != 1 { + t.Errorf("expected have[%d] = 1, got %d", i, count) + } + } + if nPopHead == 0 { + // In theory it's possible in a valid schedule for + // popHead to never succeed, but in practice it almost + // always succeeds, so this is unlikely to flake. + t.Errorf("popHead never succeeded") + } +} + func BenchmarkPool(b *testing.B) { var p Pool b.RunParallel(func(pb *testing.PB) { diff --git a/src/sync/poolqueue.go b/src/sync/poolqueue.go new file mode 100644 index 0000000000..bc2ab647ff --- /dev/null +++ b/src/sync/poolqueue.go @@ -0,0 +1,185 @@ +// Copyright 2019 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import ( + "sync/atomic" + "unsafe" +) + +// poolDequeue is a lock-free fixed-size single-producer, +// multi-consumer queue. The single producer can both push and pop +// from the head, and consumers can pop from the tail. +// +// It has the added feature that it nils out unused slots to avoid +// unnecessary retention of objects. This is important for sync.Pool, +// but not typically a property considered in the literature. +type poolDequeue struct { + // headTail packs together a 32-bit head index and a 32-bit + // tail index. Both are indexes into vals modulo len(vals)-1. + // + // tail = index of oldest data in queue + // head = index of next slot to fill + // + // Slots in the range [tail, head) are owned by consumers. + // A consumer continues to own a slot outside this range until + // it nils the slot, at which point ownership passes to the + // producer. + // + // The head index is stored in the most-significant bits so + // that we can atomically add to it and the overflow is + // harmless. + headTail uint64 + + // vals is a ring buffer of interface{} values stored in this + // dequeue. The size of this must be a power of 2. + // + // vals[i].typ is nil if the slot is empty and non-nil + // otherwise. A slot is still in use until *both* the tail + // index has moved beyond it and typ has been set to nil. This + // is set to nil atomically by the consumer and read + // atomically by the producer. + vals []eface +} + +type eface struct { + typ, val unsafe.Pointer +} + +const dequeueBits = 32 + +// dequeueLimit is the maximum size of a poolDequeue. +// +// This is half of 1<<dequeueBits because detecting fullness depends +// on wrapping around the ring buffer without wrapping around the +// index. +const dequeueLimit = (1 << dequeueBits) / 2 + +// dequeueNil is used in poolDeqeue to represent interface{}(nil). +// Since we use nil to represent empty slots, we need a sentinel value +// to represent nil. +type dequeueNil *struct{} + +func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) { + const mask = 1<<dequeueBits - 1 + head = uint32((ptrs >> dequeueBits) & mask) + tail = uint32(ptrs & mask) + return +} + +func (d *poolDequeue) pack(head, tail uint32) uint64 { + const mask = 1<<dequeueBits - 1 + return (uint64(head) << dequeueBits) | + uint64(tail&mask) +} + +// pushHead adds val at the head of the queue. It returns false if the +// queue is full. It must only be called by a single producer. +func (d *poolDequeue) pushHead(val interface{}) bool { + ptrs := atomic.LoadUint64(&d.headTail) + head, tail := d.unpack(ptrs) + if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { + // Queue is full. + return false + } + slot := &d.vals[head&uint32(len(d.vals)-1)] + + // Check if the head slot has been released by popTail. + typ := atomic.LoadPointer(&slot.typ) + if typ != nil { + // Another goroutine is still cleaning up the tail, so + // the queue is actually still full. + return false + } + + // The head slot is free, so we own it. + if val == nil { + val = dequeueNil(nil) + } + *(*interface{})(unsafe.Pointer(slot)) = val + + // Increment head. This passes ownership of slot to popTail + // and acts as a store barrier for writing the slot. + atomic.AddUint64(&d.headTail, 1<<dequeueBits) + return true +} + +// popHead removes and returns the element at the head of the queue. +// It returns false if the queue is empty. It must only be called by a +// single producer. +func (d *poolDequeue) popHead() (interface{}, bool) { + var slot *eface + for { + ptrs := atomic.LoadUint64(&d.headTail) + head, tail := d.unpack(ptrs) + if tail == head { + // Queue is empty. + return nil, false + } + + // Confirm tail and decrement head. We do this before + // reading the value to take back ownership of this + // slot. + head-- + ptrs2 := d.pack(head, tail) + if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { + // We successfully took back slot. + slot = &d.vals[head&uint32(len(d.vals)-1)] + break + } + } + + val := *(*interface{})(unsafe.Pointer(slot)) + if val == dequeueNil(nil) { + val = nil + } + // Zero the slot. Unlike popTail, this isn't racing with + // pushHead, so we don't need to be careful here. + *slot = eface{} + return val, true +} + +// popTail removes and returns the element at the tail of the queue. +// It returns false if the queue is empty. It may be called by any +// number of consumers. +func (d *poolDequeue) popTail() (interface{}, bool) { + var slot *eface + for { + ptrs := atomic.LoadUint64(&d.headTail) + head, tail := d.unpack(ptrs) + if tail == head { + // Queue is empty. + return nil, false + } + + // Confirm head and tail (for our speculative check + // above) and increment tail. If this succeeds, then + // we own the slot at tail. + ptrs2 := d.pack(head, tail+1) + if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { + // Success. + slot = &d.vals[tail&uint32(len(d.vals)-1)] + break + } + } + + // We now own slot. + val := *(*interface{})(unsafe.Pointer(slot)) + if val == dequeueNil(nil) { + val = nil + } + + // Tell pushHead that we're done with this slot. Zeroing the + // slot is also important so we don't leave behind references + // that could keep this object live longer than necessary. + // + // We write to val first and then publish that we're done with + // this slot by atomically writing to typ. + slot.val = nil + atomic.StorePointer(&slot.typ, nil) + // At this point pushHead owns the slot. + + return val, true +} |