diff options
author | Ian Lance Taylor <iant@golang.org> | 2023-07-19 16:09:35 -0700 |
---|---|---|
committer | Gopher Robot <gobot@golang.org> | 2023-07-20 15:45:57 +0000 |
commit | f51c55bfc382443b61ca3257c1ffea59eee2559f (patch) | |
tree | 646c5b640b8f420814331c164782c5000f3ecbe3 | |
parent | 890b96f7abd8ba5b2243959d9b49c212a0fc4d78 (diff) | |
download | go-f51c55bfc382443b61ca3257c1ffea59eee2559f.tar.gz go-f51c55bfc382443b61ca3257c1ffea59eee2559f.zip |
runtime: adjust netpollWaiters after goroutines are ready
The runtime was adjusting netpollWaiters before the waiting
goroutines were marked as ready. This could cause the scheduler
to report a deadlock because there were no goroutines ready to run.
Keeping netpollWaiters non-zero ensures that at least one goroutine
will call netpoll(-1) from findRunnable.
This does mean that if a program has network activity for a while
and then never has it again, and also has no timers, then we can leave
an M stranded in a call to netpoll from which it will never return.
At least this won't be a common case. And it's not new; this has been
a potential problem for some time.
Fixes #61454
Change-Id: I17c7f891c2bb1262fda12c6929664e64686463c8
Reviewed-on: https://go-review.googlesource.com/c/go/+/511455
TryBot-Result: Gopher Robot <gobot@golang.org>
Run-TryBot: Ian Lance Taylor <iant@golang.org>
Reviewed-by: Michael Knyszek <mknyszek@google.com>
Auto-Submit: Ian Lance Taylor <iant@golang.org>
Reviewed-by: Heschi Kreinick <heschi@google.com>
-rw-r--r-- | src/runtime/netpoll.go | 58 | ||||
-rw-r--r-- | src/runtime/netpoll_aix.go | 11 | ||||
-rw-r--r-- | src/runtime/netpoll_epoll.go | 11 | ||||
-rw-r--r-- | src/runtime/netpoll_fake.go | 4 | ||||
-rw-r--r-- | src/runtime/netpoll_kqueue.go | 11 | ||||
-rw-r--r-- | src/runtime/netpoll_solaris.go | 11 | ||||
-rw-r--r-- | src/runtime/netpoll_stub.go | 12 | ||||
-rw-r--r-- | src/runtime/netpoll_wasip1.go | 11 | ||||
-rw-r--r-- | src/runtime/netpoll_windows.go | 15 | ||||
-rw-r--r-- | src/runtime/proc.go | 22 |
10 files changed, 107 insertions, 59 deletions
diff --git a/src/runtime/netpoll.go b/src/runtime/netpoll.go index 6877b2c350..9c2e40ce8a 100644 --- a/src/runtime/netpoll.go +++ b/src/runtime/netpoll.go @@ -26,10 +26,12 @@ import ( // func netpollclose(fd uintptr) int32 // Disable notifications for fd. Return an errno value. // -// func netpoll(delta int64) gList +// func netpoll(delta int64) (gList, int32) // Poll the network. If delta < 0, block indefinitely. If delta == 0, // poll without blocking. If delta > 0, block for up to delta nanoseconds. -// Return a list of goroutines built by calling netpollready. +// Return a list of goroutines built by calling netpollready, +// and a delta to add to netpollWaiters when all goroutines are ready. +// This will never return an empty list with a non-zero delta. // // func netpollBreak() // Wake up the network poller, assumed to be blocked in netpoll. @@ -426,12 +428,13 @@ func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) { } // If we set the new deadline in the past, unblock currently pending IO if any. // Note that pd.publishInfo has already been called, above, immediately after modifying rd and wd. + delta := int32(0) var rg, wg *g if pd.rd < 0 { - rg = netpollunblock(pd, 'r', false) + rg = netpollunblock(pd, 'r', false, &delta) } if pd.wd < 0 { - wg = netpollunblock(pd, 'w', false) + wg = netpollunblock(pd, 'w', false, &delta) } unlock(&pd.lock) if rg != nil { @@ -440,6 +443,7 @@ func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) { if wg != nil { netpollgoready(wg, 3) } + netpollAdjustWaiters(delta) } //go:linkname poll_runtime_pollUnblock internal/poll.runtime_pollUnblock @@ -453,8 +457,9 @@ func poll_runtime_pollUnblock(pd *pollDesc) { pd.wseq++ var rg, wg *g pd.publishInfo() - rg = netpollunblock(pd, 'r', false) - wg = netpollunblock(pd, 'w', false) + delta := int32(0) + rg = netpollunblock(pd, 'r', false, &delta) + wg = netpollunblock(pd, 'w', false, &delta) if pd.rt.f != nil { deltimer(&pd.rt) pd.rt.f = nil @@ -470,6 +475,7 @@ func poll_runtime_pollUnblock(pd *pollDesc) { if wg != nil { netpollgoready(wg, 3) } + netpollAdjustWaiters(delta) } // netpollready is called by the platform-specific netpoll function. @@ -478,16 +484,19 @@ func poll_runtime_pollUnblock(pd *pollDesc) { // from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate // whether the fd is ready for reading or writing or both. // +// This returns a delta to apply to netpollWaiters. +// // This may run while the world is stopped, so write barriers are not allowed. // //go:nowritebarrier -func netpollready(toRun *gList, pd *pollDesc, mode int32) { +func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 { + delta := int32(0) var rg, wg *g if mode == 'r' || mode == 'r'+'w' { - rg = netpollunblock(pd, 'r', true) + rg = netpollunblock(pd, 'r', true, &delta) } if mode == 'w' || mode == 'r'+'w' { - wg = netpollunblock(pd, 'w', true) + wg = netpollunblock(pd, 'w', true, &delta) } if rg != nil { toRun.push(rg) @@ -495,6 +504,7 @@ func netpollready(toRun *gList, pd *pollDesc, mode int32) { if wg != nil { toRun.push(wg) } + return delta } func netpollcheckerr(pd *pollDesc, mode int32) int { @@ -520,7 +530,7 @@ func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool { // Bump the count of goroutines waiting for the poller. // The scheduler uses this to decide whether to block // waiting for the poller if there is nothing else to do. - netpollWaiters.Add(1) + netpollAdjustWaiters(1) } return r } @@ -570,7 +580,13 @@ func netpollblock(pd *pollDesc, mode int32, waitio bool) bool { return old == pdReady } -func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g { +// netpollunblock moves either pd.rg (if mode == 'r') or +// pd.wg (if mode == 'w') into the pdReady state. +// This returns any goroutine blocked on pd.{rg,wg}. +// It adds any adjustment to netpollWaiters to *delta; +// this adjustment should be applied after the goroutine has +// been marked ready. +func netpollunblock(pd *pollDesc, mode int32, ioready bool, delta *int32) *g { gpp := &pd.rg if mode == 'w' { gpp = &pd.wg @@ -594,7 +610,7 @@ func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g { if old == pdWait { old = pdNil } else if old != pdNil { - netpollWaiters.Add(-1) + *delta -= 1 } return (*g)(unsafe.Pointer(old)) } @@ -614,6 +630,7 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) { unlock(&pd.lock) return } + delta := int32(0) var rg *g if read { if pd.rd <= 0 || pd.rt.f == nil { @@ -621,7 +638,7 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) { } pd.rd = -1 pd.publishInfo() - rg = netpollunblock(pd, 'r', false) + rg = netpollunblock(pd, 'r', false, &delta) } var wg *g if write { @@ -630,7 +647,7 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) { } pd.wd = -1 pd.publishInfo() - wg = netpollunblock(pd, 'w', false) + wg = netpollunblock(pd, 'w', false, &delta) } unlock(&pd.lock) if rg != nil { @@ -639,6 +656,7 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) { if wg != nil { netpollgoready(wg, 0) } + netpollAdjustWaiters(delta) } func netpollDeadline(arg any, seq uintptr) { @@ -653,6 +671,18 @@ func netpollWriteDeadline(arg any, seq uintptr) { netpolldeadlineimpl(arg.(*pollDesc), seq, false, true) } +// netpollAnyWaiters reports whether any goroutines are waiting for I/O. +func netpollAnyWaiters() bool { + return netpollWaiters.Load() > 0 +} + +// netpollAdjustWaiters adds delta to netpollWaiters. +func netpollAdjustWaiters(delta int32) { + if delta != 0 { + netpollWaiters.Add(delta) + } +} + func (c *pollCache) alloc() *pollDesc { lock(&c.lock) if c.first == nil { diff --git a/src/runtime/netpoll_aix.go b/src/runtime/netpoll_aix.go index fad976b932..a34b4d8bcf 100644 --- a/src/runtime/netpoll_aix.go +++ b/src/runtime/netpoll_aix.go @@ -154,13 +154,13 @@ func netpollBreak() { // delay > 0: block for up to that many nanoseconds // //go:nowritebarrierrec -func netpoll(delay int64) gList { +func netpoll(delay int64) (gList, int32) { var timeout uintptr if delay < 0 { timeout = ^uintptr(0) } else if delay == 0 { // TODO: call poll with timeout == 0 - return gList{} + return gList{}, 0 } else if delay < 1e6 { timeout = 1 } else if delay < 1e15 { @@ -186,7 +186,7 @@ retry: // If a timed sleep was interrupted, just return to // recalculate how long we should sleep now. if timeout > 0 { - return gList{} + return gList{}, 0 } goto retry } @@ -206,6 +206,7 @@ retry: n-- } var toRun gList + delta := int32(0) for i := 1; i < len(pfds) && n > 0; i++ { pfd := &pfds[i] @@ -220,10 +221,10 @@ retry: } if mode != 0 { pds[i].setEventErr(pfd.revents == _POLLERR, 0) - netpollready(&toRun, pds[i], mode) + delta += netpollready(&toRun, pds[i], mode) n-- } } unlock(&mtxset) - return toRun + return toRun, delta } diff --git a/src/runtime/netpoll_epoll.go b/src/runtime/netpoll_epoll.go index e29b64dc9c..cda19fbc27 100644 --- a/src/runtime/netpoll_epoll.go +++ b/src/runtime/netpoll_epoll.go @@ -95,9 +95,9 @@ func netpollBreak() { // delay < 0: blocks indefinitely // delay == 0: does not block, just polls // delay > 0: block for up to that many nanoseconds -func netpoll(delay int64) gList { +func netpoll(delay int64) (gList, int32) { if epfd == -1 { - return gList{} + return gList{}, 0 } var waitms int32 if delay < 0 { @@ -124,11 +124,12 @@ retry: // If a timed sleep was interrupted, just return to // recalculate how long we should sleep now. if waitms > 0 { - return gList{} + return gList{}, 0 } goto retry } var toRun gList + delta := int32(0) for i := int32(0); i < n; i++ { ev := events[i] if ev.Events == 0 { @@ -164,9 +165,9 @@ retry: tag := tp.tag() if pd.fdseq.Load() == tag { pd.setEventErr(ev.Events == syscall.EPOLLERR, tag) - netpollready(&toRun, pd, mode) + delta += netpollready(&toRun, pd, mode) } } } - return toRun + return toRun, delta } diff --git a/src/runtime/netpoll_fake.go b/src/runtime/netpoll_fake.go index 5319561779..41f86a85e3 100644 --- a/src/runtime/netpoll_fake.go +++ b/src/runtime/netpoll_fake.go @@ -30,6 +30,6 @@ func netpollarm(pd *pollDesc, mode int) { func netpollBreak() { } -func netpoll(delay int64) gList { - return gList{} +func netpoll(delay int64) (gList, int32) { + return gList{}, 0 } diff --git a/src/runtime/netpoll_kqueue.go b/src/runtime/netpoll_kqueue.go index 3af45e6892..33b9815965 100644 --- a/src/runtime/netpoll_kqueue.go +++ b/src/runtime/netpoll_kqueue.go @@ -118,9 +118,9 @@ func netpollBreak() { // delay < 0: blocks indefinitely // delay == 0: does not block, just polls // delay > 0: block for up to that many nanoseconds -func netpoll(delay int64) gList { +func netpoll(delay int64) (gList, int32) { if kq == -1 { - return gList{} + return gList{}, 0 } var tp *timespec var ts timespec @@ -147,11 +147,12 @@ retry: // If a timed sleep was interrupted, just return to // recalculate how long we should sleep now. if delay > 0 { - return gList{} + return gList{}, 0 } goto retry } var toRun gList + delta := int32(0) for i := 0; i < int(n); i++ { ev := &events[i] @@ -208,8 +209,8 @@ retry: } } pd.setEventErr(ev.flags == _EV_ERROR, tag) - netpollready(&toRun, pd, mode) + delta += netpollready(&toRun, pd, mode) } } - return toRun + return toRun, delta } diff --git a/src/runtime/netpoll_solaris.go b/src/runtime/netpoll_solaris.go index 13c7ffc2ca..41f145c866 100644 --- a/src/runtime/netpoll_solaris.go +++ b/src/runtime/netpoll_solaris.go @@ -219,9 +219,9 @@ func netpollBreak() { // delay < 0: blocks indefinitely // delay == 0: does not block, just polls // delay > 0: block for up to that many nanoseconds -func netpoll(delay int64) gList { +func netpoll(delay int64) (gList, int32) { if portfd == -1 { - return gList{} + return gList{}, 0 } var wait *timespec @@ -259,12 +259,13 @@ retry: // If a timed sleep was interrupted and there are no events, // just return to recalculate how long we should sleep now. if delay > 0 { - return gList{} + return gList{}, 0 } goto retry } var toRun gList + delta := int32(0) for i := 0; i < int(n); i++ { ev := &events[i] @@ -324,9 +325,9 @@ retry: // about the event port on SmartOS. // // See golang.org/x/issue/30840. - netpollready(&toRun, pd, mode) + delta += netpollready(&toRun, pd, mode) } } - return toRun + return toRun, delta } diff --git a/src/runtime/netpoll_stub.go b/src/runtime/netpoll_stub.go index 14cf0c327f..d950661acf 100644 --- a/src/runtime/netpoll_stub.go +++ b/src/runtime/netpoll_stub.go @@ -9,7 +9,6 @@ package runtime import "runtime/internal/atomic" var netpollInited atomic.Uint32 -var netpollWaiters atomic.Uint32 var netpollStubLock mutex var netpollNote note @@ -34,7 +33,7 @@ func netpollBreak() { // Polls for ready network connections. // Returns list of goroutines that become runnable. -func netpoll(delay int64) gList { +func netpoll(delay int64) (gList, int32) { // Implementation for platforms that do not support // integrated network poller. if delay != 0 { @@ -53,9 +52,16 @@ func netpoll(delay int64) gList { // (eg when running TestNetpollBreak). osyield() } - return gList{} + return gList{}, 0 } func netpollinited() bool { return netpollInited.Load() != 0 } + +func netpollAnyWaiters() bool { + return false +} + +func netpollAdjustWaiters(delta int32) { +} diff --git a/src/runtime/netpoll_wasip1.go b/src/runtime/netpoll_wasip1.go index 677287b30f..9903726809 100644 --- a/src/runtime/netpoll_wasip1.go +++ b/src/runtime/netpoll_wasip1.go @@ -184,7 +184,7 @@ func netpollclose(fd uintptr) int32 { func netpollBreak() {} -func netpoll(delay int64) gList { +func netpoll(delay int64) (gList, int32) { lock(&mtx) // If delay >= 0, we include a subscription of type Clock that we use as @@ -201,7 +201,7 @@ func netpoll(delay int64) gList { if len(pollsubs) == 0 { unlock(&mtx) - return gList{} + return gList{}, 0 } evts = evts[:len(pollsubs)] @@ -221,12 +221,13 @@ retry: // recalculate how long we should sleep now. if delay > 0 { unlock(&mtx) - return gList{} + return gList{}, 0 } goto retry } var toRun gList + delta := int32(0) for i := 0; i < int(nevents); i++ { e := &evts[i] if e.typ == eventtypeClock { @@ -245,10 +246,10 @@ retry: pd := (*pollDesc)(unsafe.Pointer(uintptr(e.userdata))) netpolldisarm(pd, mode) pd.setEventErr(e.error != 0, 0) - netpollready(&toRun, pd, mode) + delta += netpollready(&toRun, pd, mode) } } unlock(&mtx) - return toRun + return toRun, delta } diff --git a/src/runtime/netpoll_windows.go b/src/runtime/netpoll_windows.go index bb77d8d045..484a9e85b2 100644 --- a/src/runtime/netpoll_windows.go +++ b/src/runtime/netpoll_windows.go @@ -84,7 +84,7 @@ func netpollBreak() { // delay < 0: blocks indefinitely // delay == 0: does not block, just polls // delay > 0: block for up to that many nanoseconds -func netpoll(delay int64) gList { +func netpoll(delay int64) (gList, int32) { var entries [64]overlappedEntry var wait, qty, flags, n, i uint32 var errno int32 @@ -94,7 +94,7 @@ func netpoll(delay int64) gList { mp := getg().m if iocphandle == _INVALID_HANDLE_VALUE { - return gList{} + return gList{}, 0 } if delay < 0 { wait = _INFINITE @@ -121,12 +121,13 @@ func netpoll(delay int64) gList { mp.blocked = false errno = int32(getlasterror()) if errno == _WAIT_TIMEOUT { - return gList{} + return gList{}, 0 } println("runtime: GetQueuedCompletionStatusEx failed (errno=", errno, ")") throw("runtime: netpoll failed") } mp.blocked = false + delta := int32(0) for i = 0; i < n; i++ { op = entries[i].op if op != nil && op.pd == entries[i].key { @@ -135,7 +136,7 @@ func netpoll(delay int64) gList { if stdcall5(_WSAGetOverlappedResult, op.pd.fd, uintptr(unsafe.Pointer(op)), uintptr(unsafe.Pointer(&qty)), 0, uintptr(unsafe.Pointer(&flags))) == 0 { errno = int32(getlasterror()) } - handlecompletion(&toRun, op, errno, qty) + delta += handlecompletion(&toRun, op, errno, qty) } else { netpollWakeSig.Store(0) if delay == 0 { @@ -145,10 +146,10 @@ func netpoll(delay int64) gList { } } } - return toRun + return toRun, delta } -func handlecompletion(toRun *gList, op *net_op, errno int32, qty uint32) { +func handlecompletion(toRun *gList, op *net_op, errno int32, qty uint32) int32 { mode := op.mode if mode != 'r' && mode != 'w' { println("runtime: GetQueuedCompletionStatusEx returned invalid mode=", mode) @@ -156,5 +157,5 @@ func handlecompletion(toRun *gList, op *net_op, errno int32, qty uint32) { } op.errno = errno op.qty = qty - netpollready(toRun, op.pd, mode) + return netpollready(toRun, op.pd, mode) } diff --git a/src/runtime/proc.go b/src/runtime/proc.go index 9fd200ea32..f4af26f172 100644 --- a/src/runtime/proc.go +++ b/src/runtime/proc.go @@ -1435,8 +1435,9 @@ func startTheWorldWithSema() int64 { mp := acquirem() // disable preemption because it can be holding p in a local var if netpollinited() { - list := netpoll(0) // non-blocking + list, delta := netpoll(0) // non-blocking injectglist(&list) + netpollAdjustWaiters(delta) } lock(&sched.lock) @@ -2974,10 +2975,11 @@ top: // blocked thread (e.g. it has already returned from netpoll, but does // not set lastpoll yet), this thread will do blocking netpoll below // anyway. - if netpollinited() && netpollWaiters.Load() > 0 && sched.lastpoll.Load() != 0 { - if list := netpoll(0); !list.empty() { // non-blocking + if netpollinited() && netpollAnyWaiters() && sched.lastpoll.Load() != 0 { + if list, delta := netpoll(0); !list.empty() { // non-blocking gp := list.pop() injectglist(&list) + netpollAdjustWaiters(delta) casgstatus(gp, _Gwaiting, _Grunnable) if traceEnabled() { traceGoUnpark(gp, 0) @@ -3166,7 +3168,7 @@ top: } // Poll network until next timer. - if netpollinited() && (netpollWaiters.Load() > 0 || pollUntil != 0) && sched.lastpoll.Swap(0) != 0 { + if netpollinited() && (netpollAnyWaiters() || pollUntil != 0) && sched.lastpoll.Swap(0) != 0 { sched.pollUntil.Store(pollUntil) if mp.p != 0 { throw("findrunnable: netpoll with p") @@ -3188,7 +3190,7 @@ top: // When using fake time, just poll. delay = 0 } - list := netpoll(delay) // block until new work is available + list, delta := netpoll(delay) // block until new work is available // Refresh now again, after potentially blocking. now = nanotime() sched.pollUntil.Store(0) @@ -3204,11 +3206,13 @@ top: unlock(&sched.lock) if pp == nil { injectglist(&list) + netpollAdjustWaiters(delta) } else { acquirep(pp) if !list.empty() { gp := list.pop() injectglist(&list) + netpollAdjustWaiters(delta) casgstatus(gp, _Gwaiting, _Grunnable) if traceEnabled() { traceGoUnpark(gp, 0) @@ -3242,9 +3246,10 @@ func pollWork() bool { if !runqempty(p) { return true } - if netpollinited() && netpollWaiters.Load() > 0 && sched.lastpoll.Load() != 0 { - if list := netpoll(0); !list.empty() { + if netpollinited() && netpollAnyWaiters() && sched.lastpoll.Load() != 0 { + if list, delta := netpoll(0); !list.empty() { injectglist(&list) + netpollAdjustWaiters(delta) return true } } @@ -5596,7 +5601,7 @@ func sysmon() { lastpoll := sched.lastpoll.Load() if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now { sched.lastpoll.CompareAndSwap(lastpoll, now) - list := netpoll(0) // non-blocking - returns list of goroutines + list, delta := netpoll(0) // non-blocking - returns list of goroutines if !list.empty() { // Need to decrement number of idle locked M's // (pretending that one more is running) before injectglist. @@ -5608,6 +5613,7 @@ func sysmon() { incidlelocked(-1) injectglist(&list) incidlelocked(1) + netpollAdjustWaiters(delta) } } if GOOS == "netbsd" && needSysmonWorkaround { |