aboutsummaryrefslogtreecommitdiff
path: root/src/runtime/chan.go
diff options
context:
space:
mode:
authorKeith Randall <khr@golang.org>2015-11-06 08:29:53 +0000
committerKeith Randall <khr@golang.org>2015-11-06 08:30:35 +0000
commite9f90ba246dab09943a2578a77b017186e8cb661 (patch)
tree4768b8299cc26b5b8bdf77f363513a537d821269 /src/runtime/chan.go
parent26263354a3d607e1cc6c06be67530dad57f43074 (diff)
downloadgo-e9f90ba246dab09943a2578a77b017186e8cb661.tar.gz
go-e9f90ba246dab09943a2578a77b017186e8cb661.zip
Revert "runtime: simplify buffered channels."
Revert for now until #13169 is understood. This reverts commit 8e496f1d6923172291658f0a785bdb47cc152325. Change-Id: Ib3eb2588824ef47a2b6eb9e377a24e5c817fcc81 Reviewed-on: https://go-review.googlesource.com/16716 Reviewed-by: Keith Randall <khr@golang.org>
Diffstat (limited to 'src/runtime/chan.go')
-rw-r--r--src/runtime/chan.go495
1 files changed, 263 insertions, 232 deletions
diff --git a/src/runtime/chan.go b/src/runtime/chan.go
index 966e4a9743..96ac306624 100644
--- a/src/runtime/chan.go
+++ b/src/runtime/chan.go
@@ -6,11 +6,6 @@ package runtime
// This file contains the implementation of Go channels.
-// Invariants:
-// At least one of c.sendq and c.recvq is empty.
-// For buffered channels, also:
-// c.qcount > 0 implies that c.recvq is empty.
-// c.qcount < c.dataqsiz implies that c.sendq is empty.
import "unsafe"
const (
@@ -158,117 +153,135 @@ func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uin
}
lock(&c.lock)
-
if c.closed != 0 {
unlock(&c.lock)
panic("send on closed channel")
}
- if sg := c.recvq.dequeue(); sg != nil {
- // Found a waiting receiver. We pass the value we want to send
- // directly to the receiver, bypassing the channel buffer (if any).
- send(c, sg, ep, func() { unlock(&c.lock) })
- return true
- }
+ if c.dataqsiz == 0 { // synchronous channel
+ sg := c.recvq.dequeue()
+ if sg != nil { // found a waiting receiver
+ if raceenabled {
+ racesync(c, sg)
+ }
+ unlock(&c.lock)
- if c.qcount < c.dataqsiz {
- // Space is available in the channel buffer. Enqueue the element to send.
- qp := chanbuf(c, c.sendx)
- if raceenabled {
- raceacquire(qp)
- racerelease(qp)
+ recvg := sg.g
+ if sg.elem != nil {
+ syncsend(c, sg, ep)
+ }
+ recvg.param = unsafe.Pointer(sg)
+ if sg.releasetime != 0 {
+ sg.releasetime = cputicks()
+ }
+ goready(recvg, 3)
+ return true
}
- typedmemmove(c.elemtype, qp, ep)
- c.sendx++
- if c.sendx == c.dataqsiz {
- c.sendx = 0
+
+ if !block {
+ unlock(&c.lock)
+ return false
}
- c.qcount++
- unlock(&c.lock)
+
+ // no receiver available: block on this channel.
+ gp := getg()
+ mysg := acquireSudog()
+ mysg.releasetime = 0
+ if t0 != 0 {
+ mysg.releasetime = -1
+ }
+ mysg.elem = ep
+ mysg.waitlink = nil
+ gp.waiting = mysg
+ mysg.g = gp
+ mysg.selectdone = nil
+ gp.param = nil
+ c.sendq.enqueue(mysg)
+ goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
+
+ // someone woke us up.
+ if mysg != gp.waiting {
+ throw("G waiting list is corrupted!")
+ }
+ gp.waiting = nil
+ if gp.param == nil {
+ if c.closed == 0 {
+ throw("chansend: spurious wakeup")
+ }
+ panic("send on closed channel")
+ }
+ gp.param = nil
+ if mysg.releasetime > 0 {
+ blockevent(int64(mysg.releasetime)-t0, 2)
+ }
+ releaseSudog(mysg)
return true
}
- if !block {
- unlock(&c.lock)
- return false
+ // asynchronous channel
+ // wait for some space to write our data
+ var t1 int64
+ for futile := byte(0); c.qcount >= c.dataqsiz; futile = traceFutileWakeup {
+ if !block {
+ unlock(&c.lock)
+ return false
+ }
+ gp := getg()
+ mysg := acquireSudog()
+ mysg.releasetime = 0
+ if t0 != 0 {
+ mysg.releasetime = -1
+ }
+ mysg.g = gp
+ mysg.elem = nil
+ mysg.selectdone = nil
+ c.sendq.enqueue(mysg)
+ goparkunlock(&c.lock, "chan send", traceEvGoBlockSend|futile, 3)
+
+ // someone woke us up - try again
+ if mysg.releasetime > 0 {
+ t1 = mysg.releasetime
+ }
+ releaseSudog(mysg)
+ lock(&c.lock)
+ if c.closed != 0 {
+ unlock(&c.lock)
+ panic("send on closed channel")
+ }
}
- // Block on the channel. Some receiver will complete our operation for us.
- gp := getg()
- mysg := acquireSudog()
- mysg.releasetime = 0
- if t0 != 0 {
- mysg.releasetime = -1
- }
- mysg.elem = ep
- mysg.waitlink = nil
- mysg.g = gp
- mysg.selectdone = nil
- gp.waiting = mysg
- gp.param = nil
- c.sendq.enqueue(mysg)
- goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
-
- // someone woke us up.
- if mysg != gp.waiting {
- throw("G waiting list is corrupted")
- }
- gp.waiting = nil
- if gp.param == nil {
- if c.closed == 0 {
- throw("chansend: spurious wakeup")
- }
- panic("send on closed channel")
+ // write our data into the channel buffer
+ if raceenabled {
+ raceacquire(chanbuf(c, c.sendx))
+ racerelease(chanbuf(c, c.sendx))
}
- gp.param = nil
- if mysg.releasetime > 0 {
- blockevent(int64(mysg.releasetime)-t0, 2)
+ typedmemmove(c.elemtype, chanbuf(c, c.sendx), ep)
+ c.sendx++
+ if c.sendx == c.dataqsiz {
+ c.sendx = 0
}
- releaseSudog(mysg)
- return true
-}
+ c.qcount++
-// send processes a send operation on an empty channel c.
-// The value ep sent by the sender is copied to the receiver sg.
-// The receiver is then woken up to go on its merry way.
-// Channel c must be empty and locked. send unlocks c with unlockf.
-// sg must already be dequeued from c.
-// ep must be non-nil and point to the heap or the caller's stack.
-func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) {
- if raceenabled {
- if c.dataqsiz == 0 {
- racesync(c, sg)
- } else {
- // Pretend we go through the buffer, even though
- // we copy directly. Note that we need to increment
- // the head/tail locations only when raceenabled.
- qp := chanbuf(c, c.recvx)
- raceacquire(qp)
- racerelease(qp)
- raceacquireg(sg.g, qp)
- racereleaseg(sg.g, qp)
- c.recvx++
- if c.recvx == c.dataqsiz {
- c.recvx = 0
- }
- c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
+ // wake up a waiting receiver
+ sg := c.recvq.dequeue()
+ if sg != nil {
+ recvg := sg.g
+ unlock(&c.lock)
+ if sg.releasetime != 0 {
+ sg.releasetime = cputicks()
}
+ goready(recvg, 3)
+ } else {
+ unlock(&c.lock)
}
- unlockf()
- if sg.elem != nil {
- sendDirect(c.elemtype, sg.elem, ep)
- sg.elem = nil
- }
- gp := sg.g
- gp.param = unsafe.Pointer(sg)
- if sg.releasetime != 0 {
- sg.releasetime = cputicks()
+ if t1 > 0 {
+ blockevent(t1-t0, 2)
}
- goready(gp, 4)
+ return true
}
-func sendDirect(t *_type, dst, src unsafe.Pointer) {
- // Send on an unbuffered or empty-buffered channel is the only operation
+func syncsend(c *hchan, sg *sudog, elem unsafe.Pointer) {
+ // Send on unbuffered channel is the only operation
// in the entire runtime where one goroutine
// writes to the stack of another goroutine. The GC assumes that
// stack writes only happen when the goroutine is running and are
@@ -277,8 +290,9 @@ func sendDirect(t *_type, dst, src unsafe.Pointer) {
// typedmemmove will call heapBitsBulkBarrier, but the target bytes
// are not in the heap, so that will not help. We arrange to call
// memmove and typeBitsBulkBarrier instead.
- memmove(dst, src, t.size)
- typeBitsBulkBarrier(t, uintptr(dst), t.size)
+ memmove(sg.elem, elem, c.elemtype.size)
+ typeBitsBulkBarrier(c.elemtype, uintptr(sg.elem), c.elemtype.size)
+ sg.elem = nil
}
func closechan(c *hchan) {
@@ -306,36 +320,27 @@ func closechan(c *hchan) {
if sg == nil {
break
}
- if sg.elem != nil {
- memclr(sg.elem, uintptr(c.elemsize))
- sg.elem = nil
- }
- if sg.releasetime != 0 {
- sg.releasetime = cputicks()
- }
gp := sg.g
+ sg.elem = nil
gp.param = nil
- if raceenabled {
- raceacquireg(gp, unsafe.Pointer(c))
+ if sg.releasetime != 0 {
+ sg.releasetime = cputicks()
}
goready(gp, 3)
}
- // release all writers (they will panic)
+ // release all writers
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
+ gp := sg.g
sg.elem = nil
+ gp.param = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
- gp := sg.g
- gp.param = nil
- if raceenabled {
- raceacquireg(gp, unsafe.Pointer(c))
- }
goready(gp, 3)
}
unlock(&c.lock)
@@ -358,10 +363,8 @@ func chanrecv2(t *chantype, c *hchan, elem unsafe.Pointer) (received bool) {
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
-// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
- // raceenabled: don't need to check ep, as it is always on the stack
- // or is new memory allocated by reflect.
+ // raceenabled: don't need to check ep, as it is always on the stack.
if debugChan {
print("chanrecv: chan=", c, "\n")
@@ -399,139 +402,167 @@ func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, r
}
lock(&c.lock)
+ if c.dataqsiz == 0 { // synchronous channel
+ if c.closed != 0 {
+ return recvclosed(c, ep)
+ }
+
+ sg := c.sendq.dequeue()
+ if sg != nil {
+ if raceenabled {
+ racesync(c, sg)
+ }
+ unlock(&c.lock)
- if c.closed != 0 && c.qcount == 0 {
- if raceenabled {
- raceacquire(unsafe.Pointer(c))
+ if ep != nil {
+ typedmemmove(c.elemtype, ep, sg.elem)
+ }
+ sg.elem = nil
+ gp := sg.g
+ gp.param = unsafe.Pointer(sg)
+ if sg.releasetime != 0 {
+ sg.releasetime = cputicks()
+ }
+ goready(gp, 3)
+ selected = true
+ received = true
+ return
}
- unlock(&c.lock)
- if ep != nil {
- memclr(ep, uintptr(c.elemsize))
+
+ if !block {
+ unlock(&c.lock)
+ return
}
- return true, false
- }
- if sg := c.sendq.dequeue(); sg != nil {
- // Found a waiting sender. If buffer is size 0, receive value
- // directly from sender. Otherwise, recieve from head of queue
- // and add sender's value to the tail of the queue (both map to
- // the same buffer slot because the queue is full).
- recv(c, sg, ep, func() { unlock(&c.lock) })
- return true, true
+ // no sender available: block on this channel.
+ gp := getg()
+ mysg := acquireSudog()
+ mysg.releasetime = 0
+ if t0 != 0 {
+ mysg.releasetime = -1
+ }
+ mysg.elem = ep
+ mysg.waitlink = nil
+ gp.waiting = mysg
+ mysg.g = gp
+ mysg.selectdone = nil
+ gp.param = nil
+ c.recvq.enqueue(mysg)
+ goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
+
+ // someone woke us up
+ if mysg != gp.waiting {
+ throw("G waiting list is corrupted!")
+ }
+ gp.waiting = nil
+ if mysg.releasetime > 0 {
+ blockevent(mysg.releasetime-t0, 2)
+ }
+ haveData := gp.param != nil
+ gp.param = nil
+ releaseSudog(mysg)
+
+ if haveData {
+ // a sender sent us some data. It already wrote to ep.
+ selected = true
+ received = true
+ return
+ }
+
+ lock(&c.lock)
+ if c.closed == 0 {
+ throw("chanrecv: spurious wakeup")
+ }
+ return recvclosed(c, ep)
}
- if c.qcount > 0 {
- // Receive directly from queue
- qp := chanbuf(c, c.recvx)
- if raceenabled {
- raceacquire(qp)
- racerelease(qp)
+ // asynchronous channel
+ // wait for some data to appear
+ var t1 int64
+ for futile := byte(0); c.qcount <= 0; futile = traceFutileWakeup {
+ if c.closed != 0 {
+ selected, received = recvclosed(c, ep)
+ if t1 > 0 {
+ blockevent(t1-t0, 2)
+ }
+ return
+ }
+
+ if !block {
+ unlock(&c.lock)
+ return
}
- if ep != nil {
- typedmemmove(c.elemtype, ep, qp)
+
+ // wait for someone to send an element
+ gp := getg()
+ mysg := acquireSudog()
+ mysg.releasetime = 0
+ if t0 != 0 {
+ mysg.releasetime = -1
}
- memclr(qp, uintptr(c.elemsize))
- c.recvx++
- if c.recvx == c.dataqsiz {
- c.recvx = 0
+ mysg.elem = nil
+ mysg.g = gp
+ mysg.selectdone = nil
+
+ c.recvq.enqueue(mysg)
+ goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv|futile, 3)
+
+ // someone woke us up - try again
+ if mysg.releasetime > 0 {
+ t1 = mysg.releasetime
}
- c.qcount--
- unlock(&c.lock)
- return true, true
+ releaseSudog(mysg)
+ lock(&c.lock)
}
- if !block {
- unlock(&c.lock)
- return false, false
- }
-
- // no sender available: block on this channel.
- gp := getg()
- mysg := acquireSudog()
- mysg.releasetime = 0
- if t0 != 0 {
- mysg.releasetime = -1
- }
- mysg.elem = ep
- mysg.waitlink = nil
- gp.waiting = mysg
- mysg.g = gp
- mysg.selectdone = nil
- gp.param = nil
- c.recvq.enqueue(mysg)
- goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
-
- // someone woke us up
- if mysg != gp.waiting {
- throw("G waiting list is corrupted")
- }
- gp.waiting = nil
- if mysg.releasetime > 0 {
- blockevent(mysg.releasetime-t0, 2)
- }
- closed := gp.param == nil
- gp.param = nil
- releaseSudog(mysg)
- return true, !closed
-}
+ if raceenabled {
+ raceacquire(chanbuf(c, c.recvx))
+ racerelease(chanbuf(c, c.recvx))
+ }
+ if ep != nil {
+ typedmemmove(c.elemtype, ep, chanbuf(c, c.recvx))
+ }
+ memclr(chanbuf(c, c.recvx), uintptr(c.elemsize))
-// recv processes a receive operation on a full channel c.
-// There are 2 parts:
-// 1) The value sent by the sender sg is put into the channel
-// and the sender is woken up to go on its merry way.
-// 2) The value received by the receiver (the current G) is
-// written to ep.
-// For synchronous channels, both values are the same.
-// For asynchronous channels, the receiver gets its data from
-// the channel buffer and the sender's data is put in the
-// channel buffer.
-// Channel c must be full and locked. recv unlocks c with unlockf.
-// sg must already be dequeued from c.
-// A non-nil ep must point to the heap or the caller's stack.
-func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) {
- if c.dataqsiz == 0 {
- if raceenabled {
- racesync(c, sg)
- }
- unlockf()
- if ep != nil {
- // copy data from sender
- // ep points to our own stack or heap, so nothing
- // special (ala sendDirect) needed here.
- typedmemmove(c.elemtype, ep, sg.elem)
+ c.recvx++
+ if c.recvx == c.dataqsiz {
+ c.recvx = 0
+ }
+ c.qcount--
+
+ // ping a sender now that there is space
+ sg := c.sendq.dequeue()
+ if sg != nil {
+ gp := sg.g
+ unlock(&c.lock)
+ if sg.releasetime != 0 {
+ sg.releasetime = cputicks()
}
+ goready(gp, 3)
} else {
- // Queue is full. Take the item at the
- // head of the queue. Make the sender enqueue
- // its item at the tail of the queue. Since the
- // queue is full, those are both the same slot.
- qp := chanbuf(c, c.recvx)
- if raceenabled {
- raceacquire(qp)
- racerelease(qp)
- raceacquireg(sg.g, qp)
- racereleaseg(sg.g, qp)
- }
- // copy data from queue to receiver
- if ep != nil {
- typedmemmove(c.elemtype, ep, qp)
- }
- // copy data from sender to queue
- typedmemmove(c.elemtype, qp, sg.elem)
- c.recvx++
- if c.recvx == c.dataqsiz {
- c.recvx = 0
- }
- c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
- unlockf()
+ unlock(&c.lock)
}
- sg.elem = nil
- gp := sg.g
- gp.param = unsafe.Pointer(sg)
- if sg.releasetime != 0 {
- sg.releasetime = cputicks()
+
+ if t1 > 0 {
+ blockevent(t1-t0, 2)
+ }
+ selected = true
+ received = true
+ return
+}
+
+// recvclosed is a helper function for chanrecv. Handles cleanup
+// when the receiver encounters a closed channel.
+// Caller must hold c.lock, recvclosed will release the lock.
+func recvclosed(c *hchan, ep unsafe.Pointer) (selected, recevied bool) {
+ if raceenabled {
+ raceacquire(unsafe.Pointer(c))
+ }
+ unlock(&c.lock)
+ if ep != nil {
+ memclr(ep, uintptr(c.elemsize))
}
- goready(gp, 4)
+ return true, false
}
// compiler implements