aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIan Lance Taylor <iant@golang.org>2023-07-19 16:09:35 -0700
committerGopher Robot <gobot@golang.org>2023-07-20 15:45:57 +0000
commitf51c55bfc382443b61ca3257c1ffea59eee2559f (patch)
tree646c5b640b8f420814331c164782c5000f3ecbe3
parent890b96f7abd8ba5b2243959d9b49c212a0fc4d78 (diff)
downloadgo-f51c55bfc382443b61ca3257c1ffea59eee2559f.tar.gz
go-f51c55bfc382443b61ca3257c1ffea59eee2559f.zip
runtime: adjust netpollWaiters after goroutines are ready
The runtime was adjusting netpollWaiters before the waiting goroutines were marked as ready. This could cause the scheduler to report a deadlock because there were no goroutines ready to run. Keeping netpollWaiters non-zero ensures that at least one goroutine will call netpoll(-1) from findRunnable. This does mean that if a program has network activity for a while and then never has it again, and also has no timers, then we can leave an M stranded in a call to netpoll from which it will never return. At least this won't be a common case. And it's not new; this has been a potential problem for some time. Fixes #61454 Change-Id: I17c7f891c2bb1262fda12c6929664e64686463c8 Reviewed-on: https://go-review.googlesource.com/c/go/+/511455 TryBot-Result: Gopher Robot <gobot@golang.org> Run-TryBot: Ian Lance Taylor <iant@golang.org> Reviewed-by: Michael Knyszek <mknyszek@google.com> Auto-Submit: Ian Lance Taylor <iant@golang.org> Reviewed-by: Heschi Kreinick <heschi@google.com>
-rw-r--r--src/runtime/netpoll.go58
-rw-r--r--src/runtime/netpoll_aix.go11
-rw-r--r--src/runtime/netpoll_epoll.go11
-rw-r--r--src/runtime/netpoll_fake.go4
-rw-r--r--src/runtime/netpoll_kqueue.go11
-rw-r--r--src/runtime/netpoll_solaris.go11
-rw-r--r--src/runtime/netpoll_stub.go12
-rw-r--r--src/runtime/netpoll_wasip1.go11
-rw-r--r--src/runtime/netpoll_windows.go15
-rw-r--r--src/runtime/proc.go22
10 files changed, 107 insertions, 59 deletions
diff --git a/src/runtime/netpoll.go b/src/runtime/netpoll.go
index 6877b2c350..9c2e40ce8a 100644
--- a/src/runtime/netpoll.go
+++ b/src/runtime/netpoll.go
@@ -26,10 +26,12 @@ import (
// func netpollclose(fd uintptr) int32
// Disable notifications for fd. Return an errno value.
//
-// func netpoll(delta int64) gList
+// func netpoll(delta int64) (gList, int32)
// Poll the network. If delta < 0, block indefinitely. If delta == 0,
// poll without blocking. If delta > 0, block for up to delta nanoseconds.
-// Return a list of goroutines built by calling netpollready.
+// Return a list of goroutines built by calling netpollready,
+// and a delta to add to netpollWaiters when all goroutines are ready.
+// This will never return an empty list with a non-zero delta.
//
// func netpollBreak()
// Wake up the network poller, assumed to be blocked in netpoll.
@@ -426,12 +428,13 @@ func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
}
// If we set the new deadline in the past, unblock currently pending IO if any.
// Note that pd.publishInfo has already been called, above, immediately after modifying rd and wd.
+ delta := int32(0)
var rg, wg *g
if pd.rd < 0 {
- rg = netpollunblock(pd, 'r', false)
+ rg = netpollunblock(pd, 'r', false, &delta)
}
if pd.wd < 0 {
- wg = netpollunblock(pd, 'w', false)
+ wg = netpollunblock(pd, 'w', false, &delta)
}
unlock(&pd.lock)
if rg != nil {
@@ -440,6 +443,7 @@ func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
if wg != nil {
netpollgoready(wg, 3)
}
+ netpollAdjustWaiters(delta)
}
//go:linkname poll_runtime_pollUnblock internal/poll.runtime_pollUnblock
@@ -453,8 +457,9 @@ func poll_runtime_pollUnblock(pd *pollDesc) {
pd.wseq++
var rg, wg *g
pd.publishInfo()
- rg = netpollunblock(pd, 'r', false)
- wg = netpollunblock(pd, 'w', false)
+ delta := int32(0)
+ rg = netpollunblock(pd, 'r', false, &delta)
+ wg = netpollunblock(pd, 'w', false, &delta)
if pd.rt.f != nil {
deltimer(&pd.rt)
pd.rt.f = nil
@@ -470,6 +475,7 @@ func poll_runtime_pollUnblock(pd *pollDesc) {
if wg != nil {
netpollgoready(wg, 3)
}
+ netpollAdjustWaiters(delta)
}
// netpollready is called by the platform-specific netpoll function.
@@ -478,16 +484,19 @@ func poll_runtime_pollUnblock(pd *pollDesc) {
// from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate
// whether the fd is ready for reading or writing or both.
//
+// This returns a delta to apply to netpollWaiters.
+//
// This may run while the world is stopped, so write barriers are not allowed.
//
//go:nowritebarrier
-func netpollready(toRun *gList, pd *pollDesc, mode int32) {
+func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 {
+ delta := int32(0)
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
- rg = netpollunblock(pd, 'r', true)
+ rg = netpollunblock(pd, 'r', true, &delta)
}
if mode == 'w' || mode == 'r'+'w' {
- wg = netpollunblock(pd, 'w', true)
+ wg = netpollunblock(pd, 'w', true, &delta)
}
if rg != nil {
toRun.push(rg)
@@ -495,6 +504,7 @@ func netpollready(toRun *gList, pd *pollDesc, mode int32) {
if wg != nil {
toRun.push(wg)
}
+ return delta
}
func netpollcheckerr(pd *pollDesc, mode int32) int {
@@ -520,7 +530,7 @@ func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
// Bump the count of goroutines waiting for the poller.
// The scheduler uses this to decide whether to block
// waiting for the poller if there is nothing else to do.
- netpollWaiters.Add(1)
+ netpollAdjustWaiters(1)
}
return r
}
@@ -570,7 +580,13 @@ func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
return old == pdReady
}
-func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
+// netpollunblock moves either pd.rg (if mode == 'r') or
+// pd.wg (if mode == 'w') into the pdReady state.
+// This returns any goroutine blocked on pd.{rg,wg}.
+// It adds any adjustment to netpollWaiters to *delta;
+// this adjustment should be applied after the goroutine has
+// been marked ready.
+func netpollunblock(pd *pollDesc, mode int32, ioready bool, delta *int32) *g {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
@@ -594,7 +610,7 @@ func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
if old == pdWait {
old = pdNil
} else if old != pdNil {
- netpollWaiters.Add(-1)
+ *delta -= 1
}
return (*g)(unsafe.Pointer(old))
}
@@ -614,6 +630,7 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
unlock(&pd.lock)
return
}
+ delta := int32(0)
var rg *g
if read {
if pd.rd <= 0 || pd.rt.f == nil {
@@ -621,7 +638,7 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
}
pd.rd = -1
pd.publishInfo()
- rg = netpollunblock(pd, 'r', false)
+ rg = netpollunblock(pd, 'r', false, &delta)
}
var wg *g
if write {
@@ -630,7 +647,7 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
}
pd.wd = -1
pd.publishInfo()
- wg = netpollunblock(pd, 'w', false)
+ wg = netpollunblock(pd, 'w', false, &delta)
}
unlock(&pd.lock)
if rg != nil {
@@ -639,6 +656,7 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
if wg != nil {
netpollgoready(wg, 0)
}
+ netpollAdjustWaiters(delta)
}
func netpollDeadline(arg any, seq uintptr) {
@@ -653,6 +671,18 @@ func netpollWriteDeadline(arg any, seq uintptr) {
netpolldeadlineimpl(arg.(*pollDesc), seq, false, true)
}
+// netpollAnyWaiters reports whether any goroutines are waiting for I/O.
+func netpollAnyWaiters() bool {
+ return netpollWaiters.Load() > 0
+}
+
+// netpollAdjustWaiters adds delta to netpollWaiters.
+func netpollAdjustWaiters(delta int32) {
+ if delta != 0 {
+ netpollWaiters.Add(delta)
+ }
+}
+
func (c *pollCache) alloc() *pollDesc {
lock(&c.lock)
if c.first == nil {
diff --git a/src/runtime/netpoll_aix.go b/src/runtime/netpoll_aix.go
index fad976b932..a34b4d8bcf 100644
--- a/src/runtime/netpoll_aix.go
+++ b/src/runtime/netpoll_aix.go
@@ -154,13 +154,13 @@ func netpollBreak() {
// delay > 0: block for up to that many nanoseconds
//
//go:nowritebarrierrec
-func netpoll(delay int64) gList {
+func netpoll(delay int64) (gList, int32) {
var timeout uintptr
if delay < 0 {
timeout = ^uintptr(0)
} else if delay == 0 {
// TODO: call poll with timeout == 0
- return gList{}
+ return gList{}, 0
} else if delay < 1e6 {
timeout = 1
} else if delay < 1e15 {
@@ -186,7 +186,7 @@ retry:
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if timeout > 0 {
- return gList{}
+ return gList{}, 0
}
goto retry
}
@@ -206,6 +206,7 @@ retry:
n--
}
var toRun gList
+ delta := int32(0)
for i := 1; i < len(pfds) && n > 0; i++ {
pfd := &pfds[i]
@@ -220,10 +221,10 @@ retry:
}
if mode != 0 {
pds[i].setEventErr(pfd.revents == _POLLERR, 0)
- netpollready(&toRun, pds[i], mode)
+ delta += netpollready(&toRun, pds[i], mode)
n--
}
}
unlock(&mtxset)
- return toRun
+ return toRun, delta
}
diff --git a/src/runtime/netpoll_epoll.go b/src/runtime/netpoll_epoll.go
index e29b64dc9c..cda19fbc27 100644
--- a/src/runtime/netpoll_epoll.go
+++ b/src/runtime/netpoll_epoll.go
@@ -95,9 +95,9 @@ func netpollBreak() {
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
-func netpoll(delay int64) gList {
+func netpoll(delay int64) (gList, int32) {
if epfd == -1 {
- return gList{}
+ return gList{}, 0
}
var waitms int32
if delay < 0 {
@@ -124,11 +124,12 @@ retry:
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if waitms > 0 {
- return gList{}
+ return gList{}, 0
}
goto retry
}
var toRun gList
+ delta := int32(0)
for i := int32(0); i < n; i++ {
ev := events[i]
if ev.Events == 0 {
@@ -164,9 +165,9 @@ retry:
tag := tp.tag()
if pd.fdseq.Load() == tag {
pd.setEventErr(ev.Events == syscall.EPOLLERR, tag)
- netpollready(&toRun, pd, mode)
+ delta += netpollready(&toRun, pd, mode)
}
}
}
- return toRun
+ return toRun, delta
}
diff --git a/src/runtime/netpoll_fake.go b/src/runtime/netpoll_fake.go
index 5319561779..41f86a85e3 100644
--- a/src/runtime/netpoll_fake.go
+++ b/src/runtime/netpoll_fake.go
@@ -30,6 +30,6 @@ func netpollarm(pd *pollDesc, mode int) {
func netpollBreak() {
}
-func netpoll(delay int64) gList {
- return gList{}
+func netpoll(delay int64) (gList, int32) {
+ return gList{}, 0
}
diff --git a/src/runtime/netpoll_kqueue.go b/src/runtime/netpoll_kqueue.go
index 3af45e6892..33b9815965 100644
--- a/src/runtime/netpoll_kqueue.go
+++ b/src/runtime/netpoll_kqueue.go
@@ -118,9 +118,9 @@ func netpollBreak() {
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
-func netpoll(delay int64) gList {
+func netpoll(delay int64) (gList, int32) {
if kq == -1 {
- return gList{}
+ return gList{}, 0
}
var tp *timespec
var ts timespec
@@ -147,11 +147,12 @@ retry:
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if delay > 0 {
- return gList{}
+ return gList{}, 0
}
goto retry
}
var toRun gList
+ delta := int32(0)
for i := 0; i < int(n); i++ {
ev := &events[i]
@@ -208,8 +209,8 @@ retry:
}
}
pd.setEventErr(ev.flags == _EV_ERROR, tag)
- netpollready(&toRun, pd, mode)
+ delta += netpollready(&toRun, pd, mode)
}
}
- return toRun
+ return toRun, delta
}
diff --git a/src/runtime/netpoll_solaris.go b/src/runtime/netpoll_solaris.go
index 13c7ffc2ca..41f145c866 100644
--- a/src/runtime/netpoll_solaris.go
+++ b/src/runtime/netpoll_solaris.go
@@ -219,9 +219,9 @@ func netpollBreak() {
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
-func netpoll(delay int64) gList {
+func netpoll(delay int64) (gList, int32) {
if portfd == -1 {
- return gList{}
+ return gList{}, 0
}
var wait *timespec
@@ -259,12 +259,13 @@ retry:
// If a timed sleep was interrupted and there are no events,
// just return to recalculate how long we should sleep now.
if delay > 0 {
- return gList{}
+ return gList{}, 0
}
goto retry
}
var toRun gList
+ delta := int32(0)
for i := 0; i < int(n); i++ {
ev := &events[i]
@@ -324,9 +325,9 @@ retry:
// about the event port on SmartOS.
//
// See golang.org/x/issue/30840.
- netpollready(&toRun, pd, mode)
+ delta += netpollready(&toRun, pd, mode)
}
}
- return toRun
+ return toRun, delta
}
diff --git a/src/runtime/netpoll_stub.go b/src/runtime/netpoll_stub.go
index 14cf0c327f..d950661acf 100644
--- a/src/runtime/netpoll_stub.go
+++ b/src/runtime/netpoll_stub.go
@@ -9,7 +9,6 @@ package runtime
import "runtime/internal/atomic"
var netpollInited atomic.Uint32
-var netpollWaiters atomic.Uint32
var netpollStubLock mutex
var netpollNote note
@@ -34,7 +33,7 @@ func netpollBreak() {
// Polls for ready network connections.
// Returns list of goroutines that become runnable.
-func netpoll(delay int64) gList {
+func netpoll(delay int64) (gList, int32) {
// Implementation for platforms that do not support
// integrated network poller.
if delay != 0 {
@@ -53,9 +52,16 @@ func netpoll(delay int64) gList {
// (eg when running TestNetpollBreak).
osyield()
}
- return gList{}
+ return gList{}, 0
}
func netpollinited() bool {
return netpollInited.Load() != 0
}
+
+func netpollAnyWaiters() bool {
+ return false
+}
+
+func netpollAdjustWaiters(delta int32) {
+}
diff --git a/src/runtime/netpoll_wasip1.go b/src/runtime/netpoll_wasip1.go
index 677287b30f..9903726809 100644
--- a/src/runtime/netpoll_wasip1.go
+++ b/src/runtime/netpoll_wasip1.go
@@ -184,7 +184,7 @@ func netpollclose(fd uintptr) int32 {
func netpollBreak() {}
-func netpoll(delay int64) gList {
+func netpoll(delay int64) (gList, int32) {
lock(&mtx)
// If delay >= 0, we include a subscription of type Clock that we use as
@@ -201,7 +201,7 @@ func netpoll(delay int64) gList {
if len(pollsubs) == 0 {
unlock(&mtx)
- return gList{}
+ return gList{}, 0
}
evts = evts[:len(pollsubs)]
@@ -221,12 +221,13 @@ retry:
// recalculate how long we should sleep now.
if delay > 0 {
unlock(&mtx)
- return gList{}
+ return gList{}, 0
}
goto retry
}
var toRun gList
+ delta := int32(0)
for i := 0; i < int(nevents); i++ {
e := &evts[i]
if e.typ == eventtypeClock {
@@ -245,10 +246,10 @@ retry:
pd := (*pollDesc)(unsafe.Pointer(uintptr(e.userdata)))
netpolldisarm(pd, mode)
pd.setEventErr(e.error != 0, 0)
- netpollready(&toRun, pd, mode)
+ delta += netpollready(&toRun, pd, mode)
}
}
unlock(&mtx)
- return toRun
+ return toRun, delta
}
diff --git a/src/runtime/netpoll_windows.go b/src/runtime/netpoll_windows.go
index bb77d8d045..484a9e85b2 100644
--- a/src/runtime/netpoll_windows.go
+++ b/src/runtime/netpoll_windows.go
@@ -84,7 +84,7 @@ func netpollBreak() {
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
-func netpoll(delay int64) gList {
+func netpoll(delay int64) (gList, int32) {
var entries [64]overlappedEntry
var wait, qty, flags, n, i uint32
var errno int32
@@ -94,7 +94,7 @@ func netpoll(delay int64) gList {
mp := getg().m
if iocphandle == _INVALID_HANDLE_VALUE {
- return gList{}
+ return gList{}, 0
}
if delay < 0 {
wait = _INFINITE
@@ -121,12 +121,13 @@ func netpoll(delay int64) gList {
mp.blocked = false
errno = int32(getlasterror())
if errno == _WAIT_TIMEOUT {
- return gList{}
+ return gList{}, 0
}
println("runtime: GetQueuedCompletionStatusEx failed (errno=", errno, ")")
throw("runtime: netpoll failed")
}
mp.blocked = false
+ delta := int32(0)
for i = 0; i < n; i++ {
op = entries[i].op
if op != nil && op.pd == entries[i].key {
@@ -135,7 +136,7 @@ func netpoll(delay int64) gList {
if stdcall5(_WSAGetOverlappedResult, op.pd.fd, uintptr(unsafe.Pointer(op)), uintptr(unsafe.Pointer(&qty)), 0, uintptr(unsafe.Pointer(&flags))) == 0 {
errno = int32(getlasterror())
}
- handlecompletion(&toRun, op, errno, qty)
+ delta += handlecompletion(&toRun, op, errno, qty)
} else {
netpollWakeSig.Store(0)
if delay == 0 {
@@ -145,10 +146,10 @@ func netpoll(delay int64) gList {
}
}
}
- return toRun
+ return toRun, delta
}
-func handlecompletion(toRun *gList, op *net_op, errno int32, qty uint32) {
+func handlecompletion(toRun *gList, op *net_op, errno int32, qty uint32) int32 {
mode := op.mode
if mode != 'r' && mode != 'w' {
println("runtime: GetQueuedCompletionStatusEx returned invalid mode=", mode)
@@ -156,5 +157,5 @@ func handlecompletion(toRun *gList, op *net_op, errno int32, qty uint32) {
}
op.errno = errno
op.qty = qty
- netpollready(toRun, op.pd, mode)
+ return netpollready(toRun, op.pd, mode)
}
diff --git a/src/runtime/proc.go b/src/runtime/proc.go
index 9fd200ea32..f4af26f172 100644
--- a/src/runtime/proc.go
+++ b/src/runtime/proc.go
@@ -1435,8 +1435,9 @@ func startTheWorldWithSema() int64 {
mp := acquirem() // disable preemption because it can be holding p in a local var
if netpollinited() {
- list := netpoll(0) // non-blocking
+ list, delta := netpoll(0) // non-blocking
injectglist(&list)
+ netpollAdjustWaiters(delta)
}
lock(&sched.lock)
@@ -2974,10 +2975,11 @@ top:
// blocked thread (e.g. it has already returned from netpoll, but does
// not set lastpoll yet), this thread will do blocking netpoll below
// anyway.
- if netpollinited() && netpollWaiters.Load() > 0 && sched.lastpoll.Load() != 0 {
- if list := netpoll(0); !list.empty() { // non-blocking
+ if netpollinited() && netpollAnyWaiters() && sched.lastpoll.Load() != 0 {
+ if list, delta := netpoll(0); !list.empty() { // non-blocking
gp := list.pop()
injectglist(&list)
+ netpollAdjustWaiters(delta)
casgstatus(gp, _Gwaiting, _Grunnable)
if traceEnabled() {
traceGoUnpark(gp, 0)
@@ -3166,7 +3168,7 @@ top:
}
// Poll network until next timer.
- if netpollinited() && (netpollWaiters.Load() > 0 || pollUntil != 0) && sched.lastpoll.Swap(0) != 0 {
+ if netpollinited() && (netpollAnyWaiters() || pollUntil != 0) && sched.lastpoll.Swap(0) != 0 {
sched.pollUntil.Store(pollUntil)
if mp.p != 0 {
throw("findrunnable: netpoll with p")
@@ -3188,7 +3190,7 @@ top:
// When using fake time, just poll.
delay = 0
}
- list := netpoll(delay) // block until new work is available
+ list, delta := netpoll(delay) // block until new work is available
// Refresh now again, after potentially blocking.
now = nanotime()
sched.pollUntil.Store(0)
@@ -3204,11 +3206,13 @@ top:
unlock(&sched.lock)
if pp == nil {
injectglist(&list)
+ netpollAdjustWaiters(delta)
} else {
acquirep(pp)
if !list.empty() {
gp := list.pop()
injectglist(&list)
+ netpollAdjustWaiters(delta)
casgstatus(gp, _Gwaiting, _Grunnable)
if traceEnabled() {
traceGoUnpark(gp, 0)
@@ -3242,9 +3246,10 @@ func pollWork() bool {
if !runqempty(p) {
return true
}
- if netpollinited() && netpollWaiters.Load() > 0 && sched.lastpoll.Load() != 0 {
- if list := netpoll(0); !list.empty() {
+ if netpollinited() && netpollAnyWaiters() && sched.lastpoll.Load() != 0 {
+ if list, delta := netpoll(0); !list.empty() {
injectglist(&list)
+ netpollAdjustWaiters(delta)
return true
}
}
@@ -5596,7 +5601,7 @@ func sysmon() {
lastpoll := sched.lastpoll.Load()
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
sched.lastpoll.CompareAndSwap(lastpoll, now)
- list := netpoll(0) // non-blocking - returns list of goroutines
+ list, delta := netpoll(0) // non-blocking - returns list of goroutines
if !list.empty() {
// Need to decrement number of idle locked M's
// (pretending that one more is running) before injectglist.
@@ -5608,6 +5613,7 @@ func sysmon() {
incidlelocked(-1)
injectglist(&list)
incidlelocked(1)
+ netpollAdjustWaiters(delta)
}
}
if GOOS == "netbsd" && needSysmonWorkaround {