diff options
author | Keith Randall <khr@golang.org> | 2015-03-20 14:12:05 -0700 |
---|---|---|
committer | Keith Randall <khr@golang.org> | 2015-11-05 15:41:05 +0000 |
commit | 8e496f1d6923172291658f0a785bdb47cc152325 (patch) | |
tree | a0fe95741274b4a9ab7da6327e0fa7778d77f1e9 /src/runtime/select.go | |
parent | 7bb2a7d63b0767f9c6fa0382ed9185ba21757095 (diff) | |
download | go-8e496f1d6923172291658f0a785bdb47cc152325.tar.gz go-8e496f1d6923172291658f0a785bdb47cc152325.zip |
runtime: simplify buffered channels.
This change removes the retry mechanism we use for buffered channels.
Instead, any sender waking up a receiver or vice versa completes the
full protocol with its counterpart. This means the counterpart does
not need to relock the channel when it wakes up. (Currently
buffered channels need to relock on wakeup.)
For sends on a channel with waiting receivers, this change replaces
two copies (sender->queue, queue->receiver) with one (sender->receiver).
For receives on channels with a waiting sender, two copies are still required.
This change unifies to a large degree the algorithm for buffered
and unbuffered channels, simplifying the overall implementation.
Fixes #11506
benchmark old ns/op new ns/op delta
BenchmarkChanProdCons10 125 110 -12.00%
BenchmarkChanProdCons0 303 284 -6.27%
BenchmarkChanProdCons100 75.5 71.3 -5.56%
BenchmarkChanContended 6452 6125 -5.07%
BenchmarkChanNonblocking 11.5 11.0 -4.35%
BenchmarkChanCreation 149 143 -4.03%
BenchmarkChanSem 63.6 61.6 -3.14%
BenchmarkChanUncontended 6390 6212 -2.79%
BenchmarkChanSync 282 276 -2.13%
BenchmarkChanProdConsWork10 516 506 -1.94%
BenchmarkChanProdConsWork0 696 685 -1.58%
BenchmarkChanProdConsWork100 470 469 -0.21%
BenchmarkChanPopular 660427 660012 -0.06%
Change-Id: I164113a56432fbc7cace0786e49c5a6e6a708ea4
Reviewed-on: https://go-review.googlesource.com/9345
Run-TryBot: Keith Randall <khr@golang.org>
Reviewed-by: Austin Clements <austin@google.com>
Reviewed-by: Dmitry Vyukov <dvyukov@google.com>
Diffstat (limited to 'src/runtime/select.go')
-rw-r--r-- | src/runtime/select.go | 118 |
1 files changed, 32 insertions, 86 deletions
diff --git a/src/runtime/select.go b/src/runtime/select.go index 8b6c3ed4c0..508a19b630 100644 --- a/src/runtime/select.go +++ b/src/runtime/select.go @@ -304,7 +304,7 @@ func selectgoImpl(sel *hselect) (uintptr, uint16) { k *scase sglist *sudog sgnext *sudog - futile byte + qp unsafe.Pointer ) loop: @@ -317,15 +317,12 @@ loop: switch cas.kind { case caseRecv: - if c.dataqsiz > 0 { - if c.qcount > 0 { - goto asyncrecv - } - } else { - sg = c.sendq.dequeue() - if sg != nil { - goto syncrecv - } + sg = c.sendq.dequeue() + if sg != nil { + goto recv + } + if c.qcount > 0 { + goto bufrecv } if c.closed != 0 { goto rclose @@ -338,15 +335,12 @@ loop: if c.closed != 0 { goto sclose } - if c.dataqsiz > 0 { - if c.qcount < c.dataqsiz { - goto asyncsend - } - } else { - sg = c.recvq.dequeue() - if sg != nil { - goto syncsend - } + sg = c.recvq.dequeue() + if sg != nil { + goto send + } + if c.qcount < c.dataqsiz { + goto bufsend } case caseDefault: @@ -363,6 +357,9 @@ loop: // pass 2 - enqueue on all chans gp = getg() done = 0 + if gp.waiting != nil { + throw("gp.waiting != nil") + } for i := 0; i < int(sel.ncase); i++ { cas = &scases[pollorder[i]] c = cas.c @@ -389,7 +386,7 @@ loop: // wait for someone to wake us up gp.param = nil - gopark(selparkcommit, unsafe.Pointer(sel), "select", traceEvGoBlockSelect|futile, 2) + gopark(selparkcommit, unsafe.Pointer(sel), "select", traceEvGoBlockSelect, 2) // someone woke us up sellock(sel) @@ -432,16 +429,13 @@ loop: } if cas == nil { - futile = traceFutileWakeup + // This can happen if we were woken up by a close(). + // TODO: figure that out explicitly so we don't need this loop. goto loop } c = cas.c - if c.dataqsiz > 0 { - throw("selectgo: shouldn't happen") - } - if debugSelect { print("wait-return: sel=", sel, " c=", c, " cas=", cas, " kind=", cas.kind, "\n") } @@ -470,7 +464,7 @@ loop: selunlock(sel) goto retc -asyncrecv: +bufrecv: // can receive from buffer if raceenabled { if cas.elem != nil { @@ -485,29 +479,20 @@ asyncrecv: if cas.receivedp != nil { *cas.receivedp = true } + qp = chanbuf(c, c.recvx) if cas.elem != nil { - typedmemmove(c.elemtype, cas.elem, chanbuf(c, c.recvx)) + typedmemmove(c.elemtype, cas.elem, qp) } - memclr(chanbuf(c, c.recvx), uintptr(c.elemsize)) + memclr(qp, uintptr(c.elemsize)) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- - sg = c.sendq.dequeue() - if sg != nil { - gp = sg.g - selunlock(sel) - if sg.releasetime != 0 { - sg.releasetime = cputicks() - } - goready(gp, 3) - } else { - selunlock(sel) - } + selunlock(sel) goto retc -asyncsend: +bufsend: // can send to buffer if raceenabled { raceacquire(chanbuf(c, c.sendx)) @@ -523,47 +508,18 @@ asyncsend: c.sendx = 0 } c.qcount++ - sg = c.recvq.dequeue() - if sg != nil { - gp = sg.g - selunlock(sel) - if sg.releasetime != 0 { - sg.releasetime = cputicks() - } - goready(gp, 3) - } else { - selunlock(sel) - } + selunlock(sel) goto retc -syncrecv: +recv: // can receive from sleeping sender (sg) - if raceenabled { - if cas.elem != nil { - raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc) - } - racesync(c, sg) - } - if msanenabled && cas.elem != nil { - msanwrite(cas.elem, c.elemtype.size) - } - selunlock(sel) + recv(c, sg, cas.elem, func() { selunlock(sel) }) if debugSelect { print("syncrecv: sel=", sel, " c=", c, "\n") } if cas.receivedp != nil { *cas.receivedp = true } - if cas.elem != nil { - typedmemmove(c.elemtype, cas.elem, sg.elem) - } - sg.elem = nil - gp = sg.g - gp.param = unsafe.Pointer(sg) - if sg.releasetime != 0 { - sg.releasetime = cputicks() - } - goready(gp, 3) goto retc rclose: @@ -580,29 +536,19 @@ rclose: } goto retc -syncsend: - // can send to sleeping receiver (sg) +send: + // can send to a sleeping receiver (sg) if raceenabled { raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc) - racesync(c, sg) } if msanenabled { msanread(cas.elem, c.elemtype.size) } - selunlock(sel) + send(c, sg, cas.elem, func() { selunlock(sel) }) if debugSelect { print("syncsend: sel=", sel, " c=", c, "\n") } - if sg.elem != nil { - syncsend(c, sg, cas.elem) - } - sg.elem = nil - gp = sg.g - gp.param = unsafe.Pointer(sg) - if sg.releasetime != 0 { - sg.releasetime = cputicks() - } - goready(gp, 3) + goto retc retc: if cas.releasetime > 0 { |