aboutsummaryrefslogtreecommitdiff
path: root/src/runtime
diff options
context:
space:
mode:
authorMichael Anthony Knyszek <mknyszek@google.com>2024-05-06 15:52:09 +0000
committerMichael Knyszek <mknyszek@google.com>2024-05-17 19:46:10 +0000
commitdfe781e1ebbb1b14f3c76c1ee730c09e27369062 (patch)
treed1d6483ae6303c05ef20a1a768216ff5072bf6fe /src/runtime
parent5890b023a549e7ba6b0c563cdf730a91c2de6fae (diff)
downloadgo-dfe781e1ebbb1b14f3c76c1ee730c09e27369062.tar.gz
go-dfe781e1ebbb1b14f3c76c1ee730c09e27369062.zip
runtime: fix coro interactions with thread-locked goroutines
This change fixes problems with thread-locked goroutines using newcoro/coroswitch/etc. Currently, the coro paths do not consider thread-locked goroutines at all and can quickly result in broken scheduler state or lost/leaked goroutines. One possible fix to these issues is to fall back on goroutine+channel semantics, but that turns out to be fairly complicated to implement and results in significant performance cliffs. More complex thread-lock state donation tricks also result in some fairly complicated state tracking that doesn't seem worth it given the use-cases of iter.Pull (and even then, there will be performance cliffs). This change implements a much simpler, but more restrictive semantics. In particular, thread-lock state is tied to the coro at the first call to newcoro (i.e. iter.Pull). From then on, the invariant is that if the coro has any thread-lock state *or* a goroutine calling into coroswitch has any thread-lock state, that the full gamut of thread-lock state must remain the same as it was when newcoro was called (the full gamut meaning internal and external lock counts as well as the identity of the thread that was locked to). This semantics allows the common cases to be always fast, but comes with a non-orthogonality caveat. Specifically, when iter.Pull is used in conjunction with thread-locked goroutines, complex cases (passing next between goroutines or passing yield between goroutines) are likely to fail. Simple cases, where any number of iter.Pull iterators are used in a straightforward way (nested, in series, etc.) from the same goroutine, will work and will be guaranteed to be fast regardless of thread-lock state. This is a compromise for the near-term and we may consider lifting the restrictions imposed by this CL in the future. Fixes #65889. Fixes #65946. Change-Id: I3fb5791e36a61f5ded50226a229a79d28739b24e Reviewed-on: https://go-review.googlesource.com/c/go/+/583675 LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com> Reviewed-by: David Chase <drchase@google.com> Reviewed-by: Austin Clements <austin@google.com>
Diffstat (limited to 'src/runtime')
-rw-r--r--src/runtime/coro.go64
-rw-r--r--src/runtime/coro_test.go71
-rw-r--r--src/runtime/crash_test.go18
-rw-r--r--src/runtime/proc.go6
-rw-r--r--src/runtime/testdata/testprog/coro.go185
-rw-r--r--src/runtime/testdata/testprogcgo/coro.go185
6 files changed, 516 insertions, 13 deletions
diff --git a/src/runtime/coro.go b/src/runtime/coro.go
index b2bc801940..3d39d13493 100644
--- a/src/runtime/coro.go
+++ b/src/runtime/coro.go
@@ -24,6 +24,11 @@ import "unsafe"
type coro struct {
gp guintptr
f func(*coro)
+
+ // State for validating thread-lock interactions.
+ mp *m
+ lockedExt uint32 // mp's external LockOSThread counter at coro creation time.
+ lockedInt uint32 // mp's internal lockOSThread counter at coro creation time.
}
//go:linkname newcoro
@@ -37,9 +42,18 @@ func newcoro(f func(*coro)) *coro {
pc := getcallerpc()
gp := getg()
systemstack(func() {
+ mp := gp.m
start := corostart
startfv := *(**funcval)(unsafe.Pointer(&start))
gp = newproc1(startfv, gp, pc, true, waitReasonCoroutine)
+
+ // Scribble down locked thread state if needed and/or donate
+ // thread-lock state to the new goroutine.
+ if mp.lockedExt+mp.lockedInt != 0 {
+ c.mp = mp
+ c.lockedExt = mp.lockedExt
+ c.lockedInt = mp.lockedInt
+ }
})
gp.coroarg = c
c.gp.set(gp)
@@ -90,17 +104,28 @@ func coroswitch(c *coro) {
// It is important not to add more atomic operations or other
// expensive operations to the fast path.
func coroswitch_m(gp *g) {
- // TODO(go.dev/issue/65889): Something really nasty will happen if either
- // goroutine in this handoff tries to lock itself to an OS thread.
- // There's an explicit multiplexing going on here that needs to be
- // disabled if either the consumer or the iterator ends up in such
- // a state.
c := gp.coroarg
gp.coroarg = nil
exit := gp.coroexit
gp.coroexit = false
mp := gp.m
+ // Track and validate thread-lock interactions.
+ //
+ // The rules with thread-lock interactions are simple. When a coro goroutine is switched to,
+ // the same thread must be used, and the locked state must match with the thread-lock state of
+ // the goroutine which called newcoro. Thread-lock state consists of the thread and the number
+ // of internal (cgo callback, etc.) and external (LockOSThread) thread locks.
+ locked := gp.lockedm != 0
+ if c.mp != nil || locked {
+ if mp != c.mp || mp.lockedInt != c.lockedInt || mp.lockedExt != c.lockedExt {
+ print("coro: got thread ", unsafe.Pointer(mp), ", want ", unsafe.Pointer(c.mp), "\n")
+ print("coro: got lock internal ", mp.lockedInt, ", want ", c.lockedInt, "\n")
+ print("coro: got lock external ", mp.lockedExt, ", want ", c.lockedExt, "\n")
+ throw("coro: OS thread locking must match locking at coroutine creation")
+ }
+ }
+
// Acquire tracer for writing for the duration of this call.
//
// There's a lot of state manipulation performed with shortcuts
@@ -109,11 +134,18 @@ func coroswitch_m(gp *g) {
// emitting an event for every single transition.
trace := traceAcquire()
+ if locked {
+ // Detach the goroutine from the thread; we'll attach to the goroutine we're
+ // switching to before returning.
+ gp.lockedm.set(nil)
+ }
+
if exit {
- // TODO(65889): If we're locked to the current OS thread and
- // we exit here while tracing is enabled, we're going to end up
- // in a really bad place (traceAcquire also calls acquirem; there's
- // no releasem before the thread exits).
+ // The M might have a non-zero OS thread lock count when we get here, gdestroy
+ // will avoid destroying the M if the G isn't explicitly locked to it via lockedm,
+ // which we cleared above. It's fine to gdestroy here also, even when locked to
+ // the thread, because we'll be switching back to another goroutine anyway, which
+ // will take back its thread-lock state before returning.
gdestroy(gp)
gp = nil
} else {
@@ -156,6 +188,14 @@ func coroswitch_m(gp *g) {
}
}
+ // Check if we're switching to ourselves. This case is able to break our
+ // thread-lock invariants and an unbuffered channel implementation of
+ // coroswitch would deadlock. It's clear that this case should just not
+ // work.
+ if gnext == gp {
+ throw("coroswitch of a goroutine to itself")
+ }
+
// Emit the trace event after getting gnext but before changing curg.
// GoSwitch expects that the current G is running and that we haven't
// switched yet for correct status emission.
@@ -175,6 +215,12 @@ func coroswitch_m(gp *g) {
casgstatus(gnext, _Grunnable, _Grunning)
}
+ // Donate locked state.
+ if locked {
+ mp.lockedg.set(gnext)
+ gnext.lockedm.set(mp)
+ }
+
// Release the trace locker. We've completed all the necessary transitions..
if trace.ok() {
traceRelease(trace)
diff --git a/src/runtime/coro_test.go b/src/runtime/coro_test.go
new file mode 100644
index 0000000000..eeb7f256f1
--- /dev/null
+++ b/src/runtime/coro_test.go
@@ -0,0 +1,71 @@
+// Copyright 2024 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package runtime_test
+
+import (
+ "runtime"
+ "strings"
+ "testing"
+)
+
+func TestCoroLockOSThread(t *testing.T) {
+ for _, test := range []string{
+ "CoroLockOSThreadIterLock",
+ "CoroLockOSThreadIterLockYield",
+ "CoroLockOSThreadLock",
+ "CoroLockOSThreadLockIterNested",
+ "CoroLockOSThreadLockIterLock",
+ "CoroLockOSThreadLockIterLockYield",
+ "CoroLockOSThreadLockIterYieldNewG",
+ "CoroLockOSThreadLockAfterPull",
+ "CoroLockOSThreadStopLocked",
+ "CoroLockOSThreadStopLockedIterNested",
+ } {
+ t.Run(test, func(t *testing.T) {
+ checkCoroTestProgOutput(t, runTestProg(t, "testprog", test))
+ })
+ }
+}
+
+func TestCoroCgoCallback(t *testing.T) {
+ if runtime.GOOS == "windows" {
+ t.Skip("coro cgo callback tests not supported on Windows")
+ }
+ for _, test := range []string{
+ "CoroCgoIterCallback",
+ "CoroCgoIterCallbackYield",
+ "CoroCgoCallback",
+ "CoroCgoCallbackIterNested",
+ "CoroCgoCallbackIterCallback",
+ "CoroCgoCallbackIterCallbackYield",
+ "CoroCgoCallbackAfterPull",
+ "CoroCgoStopCallback",
+ "CoroCgoStopCallbackIterNested",
+ } {
+ t.Run(test, func(t *testing.T) {
+ checkCoroTestProgOutput(t, runTestProg(t, "testprogcgo", test))
+ })
+ }
+}
+
+func checkCoroTestProgOutput(t *testing.T, output string) {
+ t.Helper()
+
+ c := strings.SplitN(output, "\n", 2)
+ if len(c) == 1 {
+ t.Fatalf("expected at least one complete line in the output, got:\n%s", output)
+ }
+ expect, ok := strings.CutPrefix(c[0], "expect: ")
+ if !ok {
+ t.Fatalf("expected first line of output to start with \"expect: \", got: %q", c[0])
+ }
+ rest := c[1]
+ if expect == "OK" && rest != "OK\n" {
+ t.Fatalf("expected just 'OK' in the output, got:\n%s", rest)
+ }
+ if !strings.Contains(rest, expect) {
+ t.Fatalf("expected %q in the output, got:\n%s", expect, rest)
+ }
+}
diff --git a/src/runtime/crash_test.go b/src/runtime/crash_test.go
index 7471977a36..69e1034ff8 100644
--- a/src/runtime/crash_test.go
+++ b/src/runtime/crash_test.go
@@ -168,7 +168,23 @@ func buildTestProg(t *testing.T, binary string, flags ...string) (string, error)
cmd := exec.Command(testenv.GoToolPath(t), append([]string{"build", "-o", exe}, flags...)...)
t.Logf("running %v", cmd)
cmd.Dir = "testdata/" + binary
- out, err := testenv.CleanCmdEnv(cmd).CombinedOutput()
+ cmd = testenv.CleanCmdEnv(cmd)
+
+ // Add the rangefunc GOEXPERIMENT unconditionally since some tests depend on it.
+ // TODO(61405): Remove this once it's enabled by default.
+ edited := false
+ for i := range cmd.Env {
+ e := cmd.Env[i]
+ if _, vars, ok := strings.Cut(e, "GOEXPERIMENT="); ok {
+ cmd.Env[i] = "GOEXPERIMENT=" + vars + ",rangefunc"
+ edited = true
+ }
+ }
+ if !edited {
+ cmd.Env = append(cmd.Env, "GOEXPERIMENT=rangefunc")
+ }
+
+ out, err := cmd.CombinedOutput()
if err != nil {
target.err = fmt.Errorf("building %s %v: %v\n%s", binary, flags, err, out)
} else {
diff --git a/src/runtime/proc.go b/src/runtime/proc.go
index 67cd6aea78..418f1c5a66 100644
--- a/src/runtime/proc.go
+++ b/src/runtime/proc.go
@@ -4217,9 +4217,9 @@ func gdestroy(gp *g) {
return
}
- if mp.lockedInt != 0 {
- print("invalid m->lockedInt = ", mp.lockedInt, "\n")
- throw("internal lockOSThread error")
+ if locked && mp.lockedInt != 0 {
+ print("runtime: mp.lockedInt = ", mp.lockedInt, "\n")
+ throw("exited a goroutine internally locked to the OS thread")
}
gfput(pp, gp)
if locked {
diff --git a/src/runtime/testdata/testprog/coro.go b/src/runtime/testdata/testprog/coro.go
new file mode 100644
index 0000000000..032215b801
--- /dev/null
+++ b/src/runtime/testdata/testprog/coro.go
@@ -0,0 +1,185 @@
+// Copyright 2024 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+//go:build goexperiment.rangefunc
+
+package main
+
+import (
+ "fmt"
+ "iter"
+ "runtime"
+)
+
+func init() {
+ register("CoroLockOSThreadIterLock", func() {
+ println("expect: OK")
+ CoroLockOSThread(callerExhaust, iterLock)
+ })
+ register("CoroLockOSThreadIterLockYield", func() {
+ println("expect: OS thread locking must match")
+ CoroLockOSThread(callerExhaust, iterLockYield)
+ })
+ register("CoroLockOSThreadLock", func() {
+ println("expect: OK")
+ CoroLockOSThread(callerExhaustLocked, iterSimple)
+ })
+ register("CoroLockOSThreadLockIterNested", func() {
+ println("expect: OK")
+ CoroLockOSThread(callerExhaustLocked, iterNested)
+ })
+ register("CoroLockOSThreadLockIterLock", func() {
+ println("expect: OK")
+ CoroLockOSThread(callerExhaustLocked, iterLock)
+ })
+ register("CoroLockOSThreadLockIterLockYield", func() {
+ println("expect: OS thread locking must match")
+ CoroLockOSThread(callerExhaustLocked, iterLockYield)
+ })
+ register("CoroLockOSThreadLockIterYieldNewG", func() {
+ println("expect: OS thread locking must match")
+ CoroLockOSThread(callerExhaustLocked, iterYieldNewG)
+ })
+ register("CoroLockOSThreadLockAfterPull", func() {
+ println("expect: OS thread locking must match")
+ CoroLockOSThread(callerLockAfterPull, iterSimple)
+ })
+ register("CoroLockOSThreadStopLocked", func() {
+ println("expect: OK")
+ CoroLockOSThread(callerStopLocked, iterSimple)
+ })
+ register("CoroLockOSThreadStopLockedIterNested", func() {
+ println("expect: OK")
+ CoroLockOSThread(callerStopLocked, iterNested)
+ })
+}
+
+func CoroLockOSThread(driver func(iter.Seq[int]) error, seq iter.Seq[int]) {
+ if err := driver(seq); err != nil {
+ println("error:", err.Error())
+ return
+ }
+ println("OK")
+}
+
+func callerExhaust(i iter.Seq[int]) error {
+ next, _ := iter.Pull(i)
+ for {
+ v, ok := next()
+ if !ok {
+ break
+ }
+ if v != 5 {
+ return fmt.Errorf("bad iterator: wanted value %d, got %d", 5, v)
+ }
+ }
+ return nil
+}
+
+func callerExhaustLocked(i iter.Seq[int]) error {
+ runtime.LockOSThread()
+ next, _ := iter.Pull(i)
+ for {
+ v, ok := next()
+ if !ok {
+ break
+ }
+ if v != 5 {
+ return fmt.Errorf("bad iterator: wanted value %d, got %d", 5, v)
+ }
+ }
+ runtime.UnlockOSThread()
+ return nil
+}
+
+func callerLockAfterPull(i iter.Seq[int]) error {
+ n := 0
+ next, _ := iter.Pull(i)
+ for {
+ runtime.LockOSThread()
+ n++
+ v, ok := next()
+ if !ok {
+ break
+ }
+ if v != 5 {
+ return fmt.Errorf("bad iterator: wanted value %d, got %d", 5, v)
+ }
+ }
+ for range n {
+ runtime.UnlockOSThread()
+ }
+ return nil
+}
+
+func callerStopLocked(i iter.Seq[int]) error {
+ runtime.LockOSThread()
+ next, stop := iter.Pull(i)
+ v, _ := next()
+ stop()
+ if v != 5 {
+ return fmt.Errorf("bad iterator: wanted value %d, got %d", 5, v)
+ }
+ runtime.UnlockOSThread()
+ return nil
+}
+
+func iterSimple(yield func(int) bool) {
+ for range 3 {
+ if !yield(5) {
+ return
+ }
+ }
+}
+
+func iterNested(yield func(int) bool) {
+ next, stop := iter.Pull(iterSimple)
+ for {
+ v, ok := next()
+ if ok {
+ if !yield(v) {
+ stop()
+ }
+ } else {
+ return
+ }
+ }
+}
+
+func iterLock(yield func(int) bool) {
+ for range 3 {
+ runtime.LockOSThread()
+ runtime.UnlockOSThread()
+
+ if !yield(5) {
+ return
+ }
+ }
+}
+
+func iterLockYield(yield func(int) bool) {
+ for range 3 {
+ runtime.LockOSThread()
+ ok := yield(5)
+ runtime.UnlockOSThread()
+ if !ok {
+ return
+ }
+ }
+}
+
+func iterYieldNewG(yield func(int) bool) {
+ for range 3 {
+ done := make(chan struct{})
+ var ok bool
+ go func() {
+ ok = yield(5)
+ done <- struct{}{}
+ }()
+ <-done
+ if !ok {
+ return
+ }
+ }
+}
diff --git a/src/runtime/testdata/testprogcgo/coro.go b/src/runtime/testdata/testprogcgo/coro.go
new file mode 100644
index 0000000000..e0cb945112
--- /dev/null
+++ b/src/runtime/testdata/testprogcgo/coro.go
@@ -0,0 +1,185 @@
+// Copyright 2024 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+//go:build goexperiment.rangefunc && !windows
+
+package main
+
+/*
+#include <stdint.h> // for uintptr_t
+
+void go_callback_coro(uintptr_t handle);
+
+static void call_go(uintptr_t handle) {
+ go_callback_coro(handle);
+}
+*/
+import "C"
+
+import (
+ "fmt"
+ "iter"
+ "runtime/cgo"
+)
+
+func init() {
+ register("CoroCgoIterCallback", func() {
+ println("expect: OK")
+ CoroCgo(callerExhaust, iterCallback)
+ })
+ register("CoroCgoIterCallbackYield", func() {
+ println("expect: OS thread locking must match")
+ CoroCgo(callerExhaust, iterCallbackYield)
+ })
+ register("CoroCgoCallback", func() {
+ println("expect: OK")
+ CoroCgo(callerExhaustCallback, iterSimple)
+ })
+ register("CoroCgoCallbackIterNested", func() {
+ println("expect: OK")
+ CoroCgo(callerExhaustCallback, iterNested)
+ })
+ register("CoroCgoCallbackIterCallback", func() {
+ println("expect: OK")
+ CoroCgo(callerExhaustCallback, iterCallback)
+ })
+ register("CoroCgoCallbackIterCallbackYield", func() {
+ println("expect: OS thread locking must match")
+ CoroCgo(callerExhaustCallback, iterCallbackYield)
+ })
+ register("CoroCgoCallbackAfterPull", func() {
+ println("expect: OS thread locking must match")
+ CoroCgo(callerCallbackAfterPull, iterSimple)
+ })
+ register("CoroCgoStopCallback", func() {
+ println("expect: OK")
+ CoroCgo(callerStopCallback, iterSimple)
+ })
+ register("CoroCgoStopCallbackIterNested", func() {
+ println("expect: OK")
+ CoroCgo(callerStopCallback, iterNested)
+ })
+}
+
+var toCall func()
+
+//export go_callback_coro
+func go_callback_coro(handle C.uintptr_t) {
+ h := cgo.Handle(handle)
+ h.Value().(func())()
+ h.Delete()
+}
+
+func callFromC(f func()) {
+ C.call_go(C.uintptr_t(cgo.NewHandle(f)))
+}
+
+func CoroCgo(driver func(iter.Seq[int]) error, seq iter.Seq[int]) {
+ if err := driver(seq); err != nil {
+ println("error:", err.Error())
+ return
+ }
+ println("OK")
+}
+
+func callerExhaust(i iter.Seq[int]) error {
+ next, _ := iter.Pull(i)
+ for {
+ v, ok := next()
+ if !ok {
+ break
+ }
+ if v != 5 {
+ return fmt.Errorf("bad iterator: wanted value %d, got %d", 5, v)
+ }
+ }
+ return nil
+}
+
+func callerExhaustCallback(i iter.Seq[int]) (err error) {
+ callFromC(func() {
+ next, _ := iter.Pull(i)
+ for {
+ v, ok := next()
+ if !ok {
+ break
+ }
+ if v != 5 {
+ err = fmt.Errorf("bad iterator: wanted value %d, got %d", 5, v)
+ }
+ }
+ })
+ return err
+}
+
+func callerStopCallback(i iter.Seq[int]) (err error) {
+ callFromC(func() {
+ next, stop := iter.Pull(i)
+ v, _ := next()
+ stop()
+ if v != 5 {
+ err = fmt.Errorf("bad iterator: wanted value %d, got %d", 5, v)
+ }
+ })
+ return err
+}
+
+func callerCallbackAfterPull(i iter.Seq[int]) (err error) {
+ next, _ := iter.Pull(i)
+ callFromC(func() {
+ for {
+ v, ok := next()
+ if !ok {
+ break
+ }
+ if v != 5 {
+ err = fmt.Errorf("bad iterator: wanted value %d, got %d", 5, v)
+ }
+ }
+ })
+ return err
+}
+
+func iterSimple(yield func(int) bool) {
+ for range 3 {
+ if !yield(5) {
+ return
+ }
+ }
+}
+
+func iterNested(yield func(int) bool) {
+ next, stop := iter.Pull(iterSimple)
+ for {
+ v, ok := next()
+ if ok {
+ if !yield(v) {
+ stop()
+ }
+ } else {
+ return
+ }
+ }
+}
+
+func iterCallback(yield func(int) bool) {
+ for range 3 {
+ callFromC(func() {})
+ if !yield(5) {
+ return
+ }
+ }
+}
+
+func iterCallbackYield(yield func(int) bool) {
+ for range 3 {
+ var ok bool
+ callFromC(func() {
+ ok = yield(5)
+ })
+ if !ok {
+ return
+ }
+ }
+}