diff options
author | Keith Randall <khr@golang.org> | 2015-11-06 08:29:53 +0000 |
---|---|---|
committer | Keith Randall <khr@golang.org> | 2015-11-06 08:30:35 +0000 |
commit | e9f90ba246dab09943a2578a77b017186e8cb661 (patch) | |
tree | 4768b8299cc26b5b8bdf77f363513a537d821269 /src/runtime/chan.go | |
parent | 26263354a3d607e1cc6c06be67530dad57f43074 (diff) | |
download | go-e9f90ba246dab09943a2578a77b017186e8cb661.tar.gz go-e9f90ba246dab09943a2578a77b017186e8cb661.zip |
Revert "runtime: simplify buffered channels."
Revert for now until #13169 is understood.
This reverts commit 8e496f1d6923172291658f0a785bdb47cc152325.
Change-Id: Ib3eb2588824ef47a2b6eb9e377a24e5c817fcc81
Reviewed-on: https://go-review.googlesource.com/16716
Reviewed-by: Keith Randall <khr@golang.org>
Diffstat (limited to 'src/runtime/chan.go')
-rw-r--r-- | src/runtime/chan.go | 495 |
1 files changed, 263 insertions, 232 deletions
diff --git a/src/runtime/chan.go b/src/runtime/chan.go index 966e4a9743..96ac306624 100644 --- a/src/runtime/chan.go +++ b/src/runtime/chan.go @@ -6,11 +6,6 @@ package runtime // This file contains the implementation of Go channels. -// Invariants: -// At least one of c.sendq and c.recvq is empty. -// For buffered channels, also: -// c.qcount > 0 implies that c.recvq is empty. -// c.qcount < c.dataqsiz implies that c.sendq is empty. import "unsafe" const ( @@ -158,117 +153,135 @@ func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uin } lock(&c.lock) - if c.closed != 0 { unlock(&c.lock) panic("send on closed channel") } - if sg := c.recvq.dequeue(); sg != nil { - // Found a waiting receiver. We pass the value we want to send - // directly to the receiver, bypassing the channel buffer (if any). - send(c, sg, ep, func() { unlock(&c.lock) }) - return true - } + if c.dataqsiz == 0 { // synchronous channel + sg := c.recvq.dequeue() + if sg != nil { // found a waiting receiver + if raceenabled { + racesync(c, sg) + } + unlock(&c.lock) - if c.qcount < c.dataqsiz { - // Space is available in the channel buffer. Enqueue the element to send. - qp := chanbuf(c, c.sendx) - if raceenabled { - raceacquire(qp) - racerelease(qp) + recvg := sg.g + if sg.elem != nil { + syncsend(c, sg, ep) + } + recvg.param = unsafe.Pointer(sg) + if sg.releasetime != 0 { + sg.releasetime = cputicks() + } + goready(recvg, 3) + return true } - typedmemmove(c.elemtype, qp, ep) - c.sendx++ - if c.sendx == c.dataqsiz { - c.sendx = 0 + + if !block { + unlock(&c.lock) + return false } - c.qcount++ - unlock(&c.lock) + + // no receiver available: block on this channel. + gp := getg() + mysg := acquireSudog() + mysg.releasetime = 0 + if t0 != 0 { + mysg.releasetime = -1 + } + mysg.elem = ep + mysg.waitlink = nil + gp.waiting = mysg + mysg.g = gp + mysg.selectdone = nil + gp.param = nil + c.sendq.enqueue(mysg) + goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3) + + // someone woke us up. + if mysg != gp.waiting { + throw("G waiting list is corrupted!") + } + gp.waiting = nil + if gp.param == nil { + if c.closed == 0 { + throw("chansend: spurious wakeup") + } + panic("send on closed channel") + } + gp.param = nil + if mysg.releasetime > 0 { + blockevent(int64(mysg.releasetime)-t0, 2) + } + releaseSudog(mysg) return true } - if !block { - unlock(&c.lock) - return false + // asynchronous channel + // wait for some space to write our data + var t1 int64 + for futile := byte(0); c.qcount >= c.dataqsiz; futile = traceFutileWakeup { + if !block { + unlock(&c.lock) + return false + } + gp := getg() + mysg := acquireSudog() + mysg.releasetime = 0 + if t0 != 0 { + mysg.releasetime = -1 + } + mysg.g = gp + mysg.elem = nil + mysg.selectdone = nil + c.sendq.enqueue(mysg) + goparkunlock(&c.lock, "chan send", traceEvGoBlockSend|futile, 3) + + // someone woke us up - try again + if mysg.releasetime > 0 { + t1 = mysg.releasetime + } + releaseSudog(mysg) + lock(&c.lock) + if c.closed != 0 { + unlock(&c.lock) + panic("send on closed channel") + } } - // Block on the channel. Some receiver will complete our operation for us. - gp := getg() - mysg := acquireSudog() - mysg.releasetime = 0 - if t0 != 0 { - mysg.releasetime = -1 - } - mysg.elem = ep - mysg.waitlink = nil - mysg.g = gp - mysg.selectdone = nil - gp.waiting = mysg - gp.param = nil - c.sendq.enqueue(mysg) - goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3) - - // someone woke us up. - if mysg != gp.waiting { - throw("G waiting list is corrupted") - } - gp.waiting = nil - if gp.param == nil { - if c.closed == 0 { - throw("chansend: spurious wakeup") - } - panic("send on closed channel") + // write our data into the channel buffer + if raceenabled { + raceacquire(chanbuf(c, c.sendx)) + racerelease(chanbuf(c, c.sendx)) } - gp.param = nil - if mysg.releasetime > 0 { - blockevent(int64(mysg.releasetime)-t0, 2) + typedmemmove(c.elemtype, chanbuf(c, c.sendx), ep) + c.sendx++ + if c.sendx == c.dataqsiz { + c.sendx = 0 } - releaseSudog(mysg) - return true -} + c.qcount++ -// send processes a send operation on an empty channel c. -// The value ep sent by the sender is copied to the receiver sg. -// The receiver is then woken up to go on its merry way. -// Channel c must be empty and locked. send unlocks c with unlockf. -// sg must already be dequeued from c. -// ep must be non-nil and point to the heap or the caller's stack. -func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) { - if raceenabled { - if c.dataqsiz == 0 { - racesync(c, sg) - } else { - // Pretend we go through the buffer, even though - // we copy directly. Note that we need to increment - // the head/tail locations only when raceenabled. - qp := chanbuf(c, c.recvx) - raceacquire(qp) - racerelease(qp) - raceacquireg(sg.g, qp) - racereleaseg(sg.g, qp) - c.recvx++ - if c.recvx == c.dataqsiz { - c.recvx = 0 - } - c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz + // wake up a waiting receiver + sg := c.recvq.dequeue() + if sg != nil { + recvg := sg.g + unlock(&c.lock) + if sg.releasetime != 0 { + sg.releasetime = cputicks() } + goready(recvg, 3) + } else { + unlock(&c.lock) } - unlockf() - if sg.elem != nil { - sendDirect(c.elemtype, sg.elem, ep) - sg.elem = nil - } - gp := sg.g - gp.param = unsafe.Pointer(sg) - if sg.releasetime != 0 { - sg.releasetime = cputicks() + if t1 > 0 { + blockevent(t1-t0, 2) } - goready(gp, 4) + return true } -func sendDirect(t *_type, dst, src unsafe.Pointer) { - // Send on an unbuffered or empty-buffered channel is the only operation +func syncsend(c *hchan, sg *sudog, elem unsafe.Pointer) { + // Send on unbuffered channel is the only operation // in the entire runtime where one goroutine // writes to the stack of another goroutine. The GC assumes that // stack writes only happen when the goroutine is running and are @@ -277,8 +290,9 @@ func sendDirect(t *_type, dst, src unsafe.Pointer) { // typedmemmove will call heapBitsBulkBarrier, but the target bytes // are not in the heap, so that will not help. We arrange to call // memmove and typeBitsBulkBarrier instead. - memmove(dst, src, t.size) - typeBitsBulkBarrier(t, uintptr(dst), t.size) + memmove(sg.elem, elem, c.elemtype.size) + typeBitsBulkBarrier(c.elemtype, uintptr(sg.elem), c.elemtype.size) + sg.elem = nil } func closechan(c *hchan) { @@ -306,36 +320,27 @@ func closechan(c *hchan) { if sg == nil { break } - if sg.elem != nil { - memclr(sg.elem, uintptr(c.elemsize)) - sg.elem = nil - } - if sg.releasetime != 0 { - sg.releasetime = cputicks() - } gp := sg.g + sg.elem = nil gp.param = nil - if raceenabled { - raceacquireg(gp, unsafe.Pointer(c)) + if sg.releasetime != 0 { + sg.releasetime = cputicks() } goready(gp, 3) } - // release all writers (they will panic) + // release all writers for { sg := c.sendq.dequeue() if sg == nil { break } + gp := sg.g sg.elem = nil + gp.param = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } - gp := sg.g - gp.param = nil - if raceenabled { - raceacquireg(gp, unsafe.Pointer(c)) - } goready(gp, 3) } unlock(&c.lock) @@ -358,10 +363,8 @@ func chanrecv2(t *chantype, c *hchan, elem unsafe.Pointer) (received bool) { // If block == false and no elements are available, returns (false, false). // Otherwise, if c is closed, zeros *ep and returns (true, false). // Otherwise, fills in *ep with an element and returns (true, true). -// A non-nil ep must point to the heap or the caller's stack. func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { - // raceenabled: don't need to check ep, as it is always on the stack - // or is new memory allocated by reflect. + // raceenabled: don't need to check ep, as it is always on the stack. if debugChan { print("chanrecv: chan=", c, "\n") @@ -399,139 +402,167 @@ func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, r } lock(&c.lock) + if c.dataqsiz == 0 { // synchronous channel + if c.closed != 0 { + return recvclosed(c, ep) + } + + sg := c.sendq.dequeue() + if sg != nil { + if raceenabled { + racesync(c, sg) + } + unlock(&c.lock) - if c.closed != 0 && c.qcount == 0 { - if raceenabled { - raceacquire(unsafe.Pointer(c)) + if ep != nil { + typedmemmove(c.elemtype, ep, sg.elem) + } + sg.elem = nil + gp := sg.g + gp.param = unsafe.Pointer(sg) + if sg.releasetime != 0 { + sg.releasetime = cputicks() + } + goready(gp, 3) + selected = true + received = true + return } - unlock(&c.lock) - if ep != nil { - memclr(ep, uintptr(c.elemsize)) + + if !block { + unlock(&c.lock) + return } - return true, false - } - if sg := c.sendq.dequeue(); sg != nil { - // Found a waiting sender. If buffer is size 0, receive value - // directly from sender. Otherwise, recieve from head of queue - // and add sender's value to the tail of the queue (both map to - // the same buffer slot because the queue is full). - recv(c, sg, ep, func() { unlock(&c.lock) }) - return true, true + // no sender available: block on this channel. + gp := getg() + mysg := acquireSudog() + mysg.releasetime = 0 + if t0 != 0 { + mysg.releasetime = -1 + } + mysg.elem = ep + mysg.waitlink = nil + gp.waiting = mysg + mysg.g = gp + mysg.selectdone = nil + gp.param = nil + c.recvq.enqueue(mysg) + goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3) + + // someone woke us up + if mysg != gp.waiting { + throw("G waiting list is corrupted!") + } + gp.waiting = nil + if mysg.releasetime > 0 { + blockevent(mysg.releasetime-t0, 2) + } + haveData := gp.param != nil + gp.param = nil + releaseSudog(mysg) + + if haveData { + // a sender sent us some data. It already wrote to ep. + selected = true + received = true + return + } + + lock(&c.lock) + if c.closed == 0 { + throw("chanrecv: spurious wakeup") + } + return recvclosed(c, ep) } - if c.qcount > 0 { - // Receive directly from queue - qp := chanbuf(c, c.recvx) - if raceenabled { - raceacquire(qp) - racerelease(qp) + // asynchronous channel + // wait for some data to appear + var t1 int64 + for futile := byte(0); c.qcount <= 0; futile = traceFutileWakeup { + if c.closed != 0 { + selected, received = recvclosed(c, ep) + if t1 > 0 { + blockevent(t1-t0, 2) + } + return + } + + if !block { + unlock(&c.lock) + return } - if ep != nil { - typedmemmove(c.elemtype, ep, qp) + + // wait for someone to send an element + gp := getg() + mysg := acquireSudog() + mysg.releasetime = 0 + if t0 != 0 { + mysg.releasetime = -1 } - memclr(qp, uintptr(c.elemsize)) - c.recvx++ - if c.recvx == c.dataqsiz { - c.recvx = 0 + mysg.elem = nil + mysg.g = gp + mysg.selectdone = nil + + c.recvq.enqueue(mysg) + goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv|futile, 3) + + // someone woke us up - try again + if mysg.releasetime > 0 { + t1 = mysg.releasetime } - c.qcount-- - unlock(&c.lock) - return true, true + releaseSudog(mysg) + lock(&c.lock) } - if !block { - unlock(&c.lock) - return false, false - } - - // no sender available: block on this channel. - gp := getg() - mysg := acquireSudog() - mysg.releasetime = 0 - if t0 != 0 { - mysg.releasetime = -1 - } - mysg.elem = ep - mysg.waitlink = nil - gp.waiting = mysg - mysg.g = gp - mysg.selectdone = nil - gp.param = nil - c.recvq.enqueue(mysg) - goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3) - - // someone woke us up - if mysg != gp.waiting { - throw("G waiting list is corrupted") - } - gp.waiting = nil - if mysg.releasetime > 0 { - blockevent(mysg.releasetime-t0, 2) - } - closed := gp.param == nil - gp.param = nil - releaseSudog(mysg) - return true, !closed -} + if raceenabled { + raceacquire(chanbuf(c, c.recvx)) + racerelease(chanbuf(c, c.recvx)) + } + if ep != nil { + typedmemmove(c.elemtype, ep, chanbuf(c, c.recvx)) + } + memclr(chanbuf(c, c.recvx), uintptr(c.elemsize)) -// recv processes a receive operation on a full channel c. -// There are 2 parts: -// 1) The value sent by the sender sg is put into the channel -// and the sender is woken up to go on its merry way. -// 2) The value received by the receiver (the current G) is -// written to ep. -// For synchronous channels, both values are the same. -// For asynchronous channels, the receiver gets its data from -// the channel buffer and the sender's data is put in the -// channel buffer. -// Channel c must be full and locked. recv unlocks c with unlockf. -// sg must already be dequeued from c. -// A non-nil ep must point to the heap or the caller's stack. -func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) { - if c.dataqsiz == 0 { - if raceenabled { - racesync(c, sg) - } - unlockf() - if ep != nil { - // copy data from sender - // ep points to our own stack or heap, so nothing - // special (ala sendDirect) needed here. - typedmemmove(c.elemtype, ep, sg.elem) + c.recvx++ + if c.recvx == c.dataqsiz { + c.recvx = 0 + } + c.qcount-- + + // ping a sender now that there is space + sg := c.sendq.dequeue() + if sg != nil { + gp := sg.g + unlock(&c.lock) + if sg.releasetime != 0 { + sg.releasetime = cputicks() } + goready(gp, 3) } else { - // Queue is full. Take the item at the - // head of the queue. Make the sender enqueue - // its item at the tail of the queue. Since the - // queue is full, those are both the same slot. - qp := chanbuf(c, c.recvx) - if raceenabled { - raceacquire(qp) - racerelease(qp) - raceacquireg(sg.g, qp) - racereleaseg(sg.g, qp) - } - // copy data from queue to receiver - if ep != nil { - typedmemmove(c.elemtype, ep, qp) - } - // copy data from sender to queue - typedmemmove(c.elemtype, qp, sg.elem) - c.recvx++ - if c.recvx == c.dataqsiz { - c.recvx = 0 - } - c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz - unlockf() + unlock(&c.lock) } - sg.elem = nil - gp := sg.g - gp.param = unsafe.Pointer(sg) - if sg.releasetime != 0 { - sg.releasetime = cputicks() + + if t1 > 0 { + blockevent(t1-t0, 2) + } + selected = true + received = true + return +} + +// recvclosed is a helper function for chanrecv. Handles cleanup +// when the receiver encounters a closed channel. +// Caller must hold c.lock, recvclosed will release the lock. +func recvclosed(c *hchan, ep unsafe.Pointer) (selected, recevied bool) { + if raceenabled { + raceacquire(unsafe.Pointer(c)) + } + unlock(&c.lock) + if ep != nil { + memclr(ep, uintptr(c.elemsize)) } - goready(gp, 4) + return true, false } // compiler implements |