aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris O'Hara <cohara87@gmail.com>2023-05-08 17:07:18 +1000
committerGopher Robot <gobot@golang.org>2023-05-25 00:12:40 +0000
commitc5c2184538411c8cf7abc4e536fbe7af8b0307f5 (patch)
tree8cf138711de09b7d23c1a489482cdd7773af5858
parent04c628935d1487632f903e4e0688fb7a34063752 (diff)
downloadgo-c5c2184538411c8cf7abc4e536fbe7af8b0307f5.tar.gz
go-c5c2184538411c8cf7abc4e536fbe7af8b0307f5.zip
runtime: implement wasip1 netpoll
Implements netpoll using WASI's poll_oneoff system call. This enables non-blocking I/O support for wasip1. Change-Id: Ie395fa49d651c8b8262d485e2847dd65b0a10bc6 Reviewed-on: https://go-review.googlesource.com/c/go/+/493357 Reviewed-by: Dmitri Shuralyov <dmitshur@google.com> Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org> TryBot-Result: Gopher Robot <gobot@golang.org> Reviewed-by: Matthew Dempsky <mdempsky@google.com> Reviewed-by: Johan Brandhorst-Satzkorn <johan.brandhorst@gmail.com> Reviewed-by: Julien Fabre <ju.pryz@gmail.com> Auto-Submit: Johan Brandhorst-Satzkorn <johan.brandhorst@gmail.com> Run-TryBot: Ian Lance Taylor <iant@golang.org>
-rwxr-xr-xmisc/wasm/go_wasip1_wasm_exec4
-rw-r--r--src/cmd/dist/test.go13
-rw-r--r--src/internal/poll/errno_unix.go2
-rw-r--r--src/internal/poll/fd_poll_js.go (renamed from src/internal/poll/fd_poll_wasm.go)2
-rw-r--r--src/internal/poll/fd_poll_runtime.go2
-rw-r--r--src/runtime/internal/wasitest/host_test.go14
-rw-r--r--src/runtime/internal/wasitest/nonblock_test.go96
-rw-r--r--src/runtime/internal/wasitest/testdata/nonblock.go48
-rw-r--r--src/runtime/netpoll.go4
-rw-r--r--src/runtime/netpoll_fake.go6
-rw-r--r--src/runtime/netpoll_wasip1.go254
-rw-r--r--src/runtime/os_wasip1.go8
12 files changed, 442 insertions, 11 deletions
diff --git a/misc/wasm/go_wasip1_wasm_exec b/misc/wasm/go_wasip1_wasm_exec
index 72228d0501..abcac8df36 100755
--- a/misc/wasm/go_wasip1_wasm_exec
+++ b/misc/wasm/go_wasip1_wasm_exec
@@ -5,10 +5,10 @@
case "$GOWASIRUNTIME" in
"wasmedge")
- exec wasmedge --dir=/ --env PWD="$PWD" "$1" "${@:2}"
+ exec wasmedge --dir=/ --env PWD="$PWD" ${GOWASIRUNTIMEARGS:-} "$1" "${@:2}"
;;
"wasmer")
- exec wasmer run --dir=/ --env PWD="$PWD" "$1" -- "${@:2}"
+ exec wasmer run --dir=/ --env PWD="$PWD" ${GOWASIRUNTIMEARGS:-} "$1" -- "${@:2}"
;;
"wasmtime")
exec wasmtime run --dir=/ --env PWD="$PWD" --max-wasm-stack 1048576 "$1" -- "${@:2}"
diff --git a/src/cmd/dist/test.go b/src/cmd/dist/test.go
index 3384149391..f16bf32bbf 100644
--- a/src/cmd/dist/test.go
+++ b/src/cmd/dist/test.go
@@ -574,7 +574,8 @@ func (t *tester) registerTests() {
// registerStdTestSpecially tracks import paths in the standard library
// whose test registration happens in a special way.
registerStdTestSpecially := map[string]bool{
- "cmd/internal/testdir": true, // Registered at the bottom with sharding.
+ "runtime/internal/wasitest": true, // Registered at the bottom as a host test.
+ "cmd/internal/testdir": true, // Registered at the bottom with sharding.
}
// Fast path to avoid the ~1 second of `go list std cmd` when
@@ -786,6 +787,16 @@ func (t *tester) registerTests() {
t.registerCgoTests(cgoHeading)
}
+ if goos == "wasip1" {
+ t.registerTest("wasip1 host tests",
+ &goTest{
+ variant: "host",
+ pkg: "runtime/internal/wasitest",
+ timeout: 1 * time.Minute,
+ runOnHost: true,
+ })
+ }
+
if goos != "android" && !t.iOS() {
// Only start multiple test dir shards on builders,
// where they get distributed to multiple machines.
diff --git a/src/internal/poll/errno_unix.go b/src/internal/poll/errno_unix.go
index 8eed93a31c..d1a18abda4 100644
--- a/src/internal/poll/errno_unix.go
+++ b/src/internal/poll/errno_unix.go
@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-//go:build unix
+//go:build unix || wasip1
package poll
diff --git a/src/internal/poll/fd_poll_wasm.go b/src/internal/poll/fd_poll_js.go
index b5158eba30..fe5e73a149 100644
--- a/src/internal/poll/fd_poll_wasm.go
+++ b/src/internal/poll/fd_poll_js.go
@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-//go:build (js && wasm) || wasip1
+//go:build js && wasm
package poll
diff --git a/src/internal/poll/fd_poll_runtime.go b/src/internal/poll/fd_poll_runtime.go
index 0a2e76d73f..b51535ecf2 100644
--- a/src/internal/poll/fd_poll_runtime.go
+++ b/src/internal/poll/fd_poll_runtime.go
@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-//go:build unix || windows
+//go:build unix || windows || wasip1
package poll
diff --git a/src/runtime/internal/wasitest/host_test.go b/src/runtime/internal/wasitest/host_test.go
new file mode 100644
index 0000000000..ca4ef8f4e8
--- /dev/null
+++ b/src/runtime/internal/wasitest/host_test.go
@@ -0,0 +1,14 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package wasi_test
+
+import "flag"
+
+var target string
+
+func init() {
+ // The dist test runner passes -target when running this as a host test.
+ flag.StringVar(&target, "target", "", "")
+}
diff --git a/src/runtime/internal/wasitest/nonblock_test.go b/src/runtime/internal/wasitest/nonblock_test.go
new file mode 100644
index 0000000000..887baab33f
--- /dev/null
+++ b/src/runtime/internal/wasitest/nonblock_test.go
@@ -0,0 +1,96 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package wasi_test
+
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "math/rand"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "syscall"
+ "testing"
+)
+
+// This test creates a set of FIFOs and writes to them in reverse order. It
+// checks that the output order matches the write order. The test binary opens
+// the FIFOs in their original order and spawns a goroutine for each that reads
+// from the FIFO and writes the result to stderr. If I/O was blocking, all
+// goroutines would be blocked waiting for one read call to return, and the
+// output order wouldn't match.
+
+type fifo struct {
+ file *os.File
+ path string
+}
+
+func TestNonblock(t *testing.T) {
+ if target != "wasip1/wasm" {
+ t.Skip()
+ }
+
+ switch os.Getenv("GOWASIRUNTIME") {
+ case "wazero", "":
+ t.Skip("wazero does not support non-blocking I/O")
+ case "wasmer":
+ t.Skip("wasmer does not support non-blocking I/O")
+ }
+
+ args := []string{"run", "./testdata/nonblock.go"}
+
+ fifos := make([]*fifo, 8)
+ for i := range fifos {
+ path := filepath.Join(t.TempDir(), fmt.Sprintf("wasip1-nonblock-fifo-%d-%d", rand.Uint32(), i))
+ if err := syscall.Mkfifo(path, 0666); err != nil {
+ t.Fatal(err)
+ }
+
+ file, err := os.OpenFile(path, os.O_RDWR, 0)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer file.Close()
+
+ args = append(args, path)
+ fifos[len(fifos)-i-1] = &fifo{file, path}
+ }
+
+ subProcess := exec.Command("go", args...)
+
+ subProcess.Env = append(os.Environ(), "GOOS=wasip1", "GOARCH=wasm")
+
+ pr, pw := io.Pipe()
+ defer pw.Close()
+
+ subProcess.Stderr = pw
+
+ if err := subProcess.Start(); err != nil {
+ t.Fatal(err)
+ }
+
+ scanner := bufio.NewScanner(pr)
+ if !scanner.Scan() {
+ t.Fatal("expected line:", scanner.Err())
+ } else if scanner.Text() != "waiting" {
+ t.Fatal("unexpected output:", scanner.Text())
+ }
+
+ for _, fifo := range fifos {
+ if _, err := fifo.file.WriteString(fifo.path + "\n"); err != nil {
+ t.Fatal(err)
+ }
+ if !scanner.Scan() {
+ t.Fatal("expected line:", scanner.Err())
+ } else if scanner.Text() != fifo.path {
+ t.Fatal("unexpected line:", scanner.Text())
+ }
+ }
+
+ if err := subProcess.Wait(); err != nil {
+ t.Fatal(err)
+ }
+}
diff --git a/src/runtime/internal/wasitest/testdata/nonblock.go b/src/runtime/internal/wasitest/testdata/nonblock.go
new file mode 100644
index 0000000000..947abe7fcf
--- /dev/null
+++ b/src/runtime/internal/wasitest/testdata/nonblock.go
@@ -0,0 +1,48 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+ "os"
+ "sync"
+)
+
+func main() {
+ ready := make(chan struct{})
+
+ var wg sync.WaitGroup
+ for _, path := range os.Args[1:] {
+ f, err := os.Open(path)
+ if err != nil {
+ panic(err)
+ }
+
+ spawnWait := make(chan struct{})
+
+ wg.Add(1)
+ go func(f *os.File) {
+ defer f.Close()
+ defer wg.Done()
+
+ close(spawnWait)
+
+ <-ready
+
+ var buf [256]byte
+ n, err := f.Read(buf[:])
+ if err != nil {
+ panic(err)
+ }
+ os.Stderr.Write(buf[:n])
+ }(f)
+
+ // Spawn one goroutine at a time.
+ <-spawnWait
+ }
+
+ println("waiting")
+ close(ready)
+ wg.Wait()
+}
diff --git a/src/runtime/netpoll.go b/src/runtime/netpoll.go
index 3e6a6961e3..9b54e8e21f 100644
--- a/src/runtime/netpoll.go
+++ b/src/runtime/netpoll.go
@@ -336,8 +336,8 @@ func poll_runtime_pollWait(pd *pollDesc, mode int) int {
if errcode != pollNoError {
return errcode
}
- // As for now only Solaris, illumos, and AIX use level-triggered IO.
- if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
+ // As for now only Solaris, illumos, AIX and wasip1 use level-triggered IO.
+ if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" || GOOS == "wasip1" {
netpollarm(pd, mode)
}
for !netpollblock(pd, int32(mode), false) {
diff --git a/src/runtime/netpoll_fake.go b/src/runtime/netpoll_fake.go
index 5782c78515..5319561779 100644
--- a/src/runtime/netpoll_fake.go
+++ b/src/runtime/netpoll_fake.go
@@ -2,10 +2,10 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-// Fake network poller for js/wasm and wasip1/wasm.
-// Should never be used, because wasm network connections do not honor "SetNonblock".
+// Fake network poller for js/wasm.
+// Should never be used, because js/wasm network connections do not honor "SetNonblock".
-//go:build (js && wasm) || wasip1
+//go:build js && wasm
package runtime
diff --git a/src/runtime/netpoll_wasip1.go b/src/runtime/netpoll_wasip1.go
new file mode 100644
index 0000000000..677287b30f
--- /dev/null
+++ b/src/runtime/netpoll_wasip1.go
@@ -0,0 +1,254 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+//go:build wasip1
+
+package runtime
+
+import "unsafe"
+
+// WASI network poller.
+//
+// WASI preview 1 includes a poll_oneoff host function that behaves similarly
+// to poll(2) on Linux. Like poll(2), poll_oneoff is level triggered. It
+// accepts one or more subscriptions to FD read or write events.
+//
+// Major differences to poll(2):
+// - the events are not written to the input entries (like pollfd.revents), and
+// instead are appended to a separate events buffer. poll_oneoff writes zero
+// or more events to the buffer (at most one per input subscription) and
+// returns the number of events written. Although the index of the
+// subscriptions might not match the index of the associated event in the
+// events buffer, both the subscription and event structs contain a userdata
+// field and when a subscription yields an event the userdata fields will
+// match.
+// - there's no explicit timeout parameter, although a time limit can be added
+// by using "clock" subscriptions.
+// - each FD subscription can either be for a read or a write, but not both.
+// This is in contrast to poll(2) which accepts a mask with POLLIN and
+// POLLOUT bits, allowing for a subscription to either, neither, or both
+// reads and writes.
+//
+// Since poll_oneoff is similar to poll(2), the implementation here was derived
+// from netpoll_aix.go.
+
+const _EINTR = 27
+
+var (
+ evts []event
+ subs []subscription
+ pds []*pollDesc
+ mtx mutex
+)
+
+func netpollinit() {
+ // Unlike poll(2), WASI's poll_oneoff doesn't accept a timeout directly. To
+ // prevent it from blocking indefinitely, a clock subscription with a
+ // timeout field needs to be submitted. Reserve a slot here for the clock
+ // subscription, and set fields that won't change between poll_oneoff calls.
+
+ subs = make([]subscription, 1, 128)
+ evts = make([]event, 0, 128)
+ pds = make([]*pollDesc, 0, 128)
+
+ timeout := &subs[0]
+ eventtype := timeout.u.eventtype()
+ *eventtype = eventtypeClock
+ clock := timeout.u.subscriptionClock()
+ clock.id = clockMonotonic
+ clock.precision = 1e3
+}
+
+func netpollIsPollDescriptor(fd uintptr) bool {
+ return false
+}
+
+func netpollopen(fd uintptr, pd *pollDesc) int32 {
+ lock(&mtx)
+
+ // We don't worry about pd.fdseq here,
+ // as mtx protects us from stale pollDescs.
+
+ pds = append(pds, pd)
+
+ // The 32-bit pd.user field holds the index of the read subscription in the
+ // upper 16 bits, and index of the write subscription in the lower bits.
+ // A disarmed=^uint16(0) sentinel is used to represent no subscription.
+ // There is thus a maximum of 65535 total subscriptions.
+ pd.user = uint32(disarmed)<<16 | uint32(disarmed)
+
+ unlock(&mtx)
+ return 0
+}
+
+const disarmed = 0xFFFF
+
+func netpollarm(pd *pollDesc, mode int) {
+ lock(&mtx)
+
+ var s subscription
+
+ s.userdata = userdata(uintptr(unsafe.Pointer(pd)))
+
+ fdReadwrite := s.u.subscriptionFdReadwrite()
+ fdReadwrite.fd = int32(pd.fd)
+
+ ridx := int(pd.user >> 16)
+ widx := int(pd.user & 0xFFFF)
+
+ if (mode == 'r' && ridx != disarmed) || (mode == 'w' && widx != disarmed) {
+ unlock(&mtx)
+ return
+ }
+
+ eventtype := s.u.eventtype()
+ switch mode {
+ case 'r':
+ *eventtype = eventtypeFdRead
+ ridx = len(subs)
+ case 'w':
+ *eventtype = eventtypeFdWrite
+ widx = len(subs)
+ }
+
+ if len(subs) == disarmed {
+ throw("overflow")
+ }
+
+ pd.user = uint32(ridx)<<16 | uint32(widx)
+
+ subs = append(subs, s)
+ evts = append(evts, event{})
+
+ unlock(&mtx)
+}
+
+func netpolldisarm(pd *pollDesc, mode int32) {
+ switch mode {
+ case 'r':
+ removesub(int(pd.user >> 16))
+ case 'w':
+ removesub(int(pd.user & 0xFFFF))
+ case 'r' + 'w':
+ removesub(int(pd.user >> 16))
+ removesub(int(pd.user & 0xFFFF))
+ }
+}
+
+func removesub(i int) {
+ if i == disarmed {
+ return
+ }
+ j := len(subs) - 1
+
+ pdi := (*pollDesc)(unsafe.Pointer(uintptr(subs[i].userdata)))
+ pdj := (*pollDesc)(unsafe.Pointer(uintptr(subs[j].userdata)))
+
+ swapsub(pdi, i, disarmed)
+ swapsub(pdj, j, i)
+
+ subs = subs[:j]
+}
+
+func swapsub(pd *pollDesc, from, to int) {
+ if from == to {
+ return
+ }
+ ridx := int(pd.user >> 16)
+ widx := int(pd.user & 0xFFFF)
+ if ridx == from {
+ ridx = to
+ } else if widx == from {
+ widx = to
+ }
+ pd.user = uint32(ridx)<<16 | uint32(widx)
+ if to != disarmed {
+ subs[to], subs[from] = subs[from], subs[to]
+ }
+}
+
+func netpollclose(fd uintptr) int32 {
+ lock(&mtx)
+ for i := 0; i < len(pds); i++ {
+ if pds[i].fd == fd {
+ netpolldisarm(pds[i], 'r'+'w')
+ pds[i] = pds[len(pds)-1]
+ pds = pds[:len(pds)-1]
+ break
+ }
+ }
+ unlock(&mtx)
+ return 0
+}
+
+func netpollBreak() {}
+
+func netpoll(delay int64) gList {
+ lock(&mtx)
+
+ // If delay >= 0, we include a subscription of type Clock that we use as
+ // a timeout. If delay < 0, we omit the subscription and allow poll_oneoff
+ // to block indefinitely.
+ pollsubs := subs
+ if delay >= 0 {
+ timeout := &subs[0]
+ clock := timeout.u.subscriptionClock()
+ clock.timeout = uint64(delay)
+ } else {
+ pollsubs = subs[1:]
+ }
+
+ if len(pollsubs) == 0 {
+ unlock(&mtx)
+ return gList{}
+ }
+
+ evts = evts[:len(pollsubs)]
+ for i := range evts {
+ evts[i] = event{}
+ }
+
+retry:
+ var nevents size
+ errno := poll_oneoff(unsafe.Pointer(&pollsubs[0]), unsafe.Pointer(&evts[0]), uint32(len(pollsubs)), unsafe.Pointer(&nevents))
+ if errno != 0 {
+ if errno != _EINTR {
+ println("errno=", errno, " len(pollsubs)=", len(pollsubs))
+ throw("poll_oneoff failed")
+ }
+ // If a timed sleep was interrupted, just return to
+ // recalculate how long we should sleep now.
+ if delay > 0 {
+ unlock(&mtx)
+ return gList{}
+ }
+ goto retry
+ }
+
+ var toRun gList
+ for i := 0; i < int(nevents); i++ {
+ e := &evts[i]
+ if e.typ == eventtypeClock {
+ continue
+ }
+
+ hangup := e.fdReadwrite.flags&fdReadwriteHangup != 0
+ var mode int32
+ if e.typ == eventtypeFdRead || e.error != 0 || hangup {
+ mode += 'r'
+ }
+ if e.typ == eventtypeFdWrite || e.error != 0 || hangup {
+ mode += 'w'
+ }
+ if mode != 0 {
+ pd := (*pollDesc)(unsafe.Pointer(uintptr(e.userdata)))
+ netpolldisarm(pd, mode)
+ pd.setEventErr(e.error != 0, 0)
+ netpollready(&toRun, pd, mode)
+ }
+ }
+
+ unlock(&mtx)
+ return toRun
+}
diff --git a/src/runtime/os_wasip1.go b/src/runtime/os_wasip1.go
index 577d9652dd..8811bb6178 100644
--- a/src/runtime/os_wasip1.go
+++ b/src/runtime/os_wasip1.go
@@ -123,6 +123,10 @@ type subscriptionClock struct {
flags subclockflags
}
+type subscriptionFdReadwrite struct {
+ fd int32
+}
+
type subscription struct {
userdata userdata
u subscriptionUnion
@@ -138,6 +142,10 @@ func (u *subscriptionUnion) subscriptionClock() *subscriptionClock {
return (*subscriptionClock)(unsafe.Pointer(&u[1]))
}
+func (u *subscriptionUnion) subscriptionFdReadwrite() *subscriptionFdReadwrite {
+ return (*subscriptionFdReadwrite)(unsafe.Pointer(&u[1]))
+}
+
//go:wasmimport wasi_snapshot_preview1 poll_oneoff
//go:noescape
func poll_oneoff(in, out unsafe.Pointer, nsubscriptions size, nevents unsafe.Pointer) errno