aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Pan <panjf2000@gmail.com>2020-03-27 03:21:17 +0000
committerIan Lance Taylor <iant@golang.org>2020-03-27 17:14:16 +0000
commit0cc1290174751971d282196e21ec9037b217e5a5 (patch)
treeb6d5698c1c4bc33df60d0259187d225326721cfd
parente4a1cf8a5698d7351af0e33d61e4f7078f3ab1ce (diff)
downloadgo-0cc1290174751971d282196e21ec9037b217e5a5.tar.gz
go-0cc1290174751971d282196e21ec9037b217e5a5.zip
runtime: converge duplicate calls to netpollBreak into one
There might be some concurrent (maybe not concurrent, just sequential but in a short time window) and duplicate calls to `netpollBreak`, trying to wake up a net-poller. If one has called `netpollBreak` and that waking event hasn't been received by epollwait/kevent/..., then the subsequent calls of `netpollBreak` ought to be ignored or in other words, these calls should be converged into one. Benchmarks go1.13.5 darwin/amd64: benchmark-func time/op (old) time/op (new) delta BenchmarkNetpollBreak-4 29668ns ±1% 3131ns ±2% -89.45% mem/B (old) mem/B (new) delta 154B ±13% 0B ±0% -100% Change-Id: I3cf757a5d6edc5a99adad7aea3baee4b7f2a8f5c GitHub-Last-Rev: 15bcfbab8a5db51f65da01315a5880a5dbf9e028 GitHub-Pull-Request: golang/go#36294 Reviewed-on: https://go-review.googlesource.com/c/go/+/212737 Run-TryBot: Emmanuel Odeke <emm.odeke@gmail.com> TryBot-Result: Gobot Gobot <gobot@golang.org> Reviewed-by: Ian Lance Taylor <iant@golang.org>
-rw-r--r--src/runtime/netpoll_epoll.go36
-rw-r--r--src/runtime/netpoll_kqueue.go30
-rw-r--r--src/runtime/netpoll_os_test.go28
-rw-r--r--src/runtime/netpoll_solaris.go27
-rw-r--r--src/runtime/netpoll_windows.go16
5 files changed, 98 insertions, 39 deletions
diff --git a/src/runtime/netpoll_epoll.go b/src/runtime/netpoll_epoll.go
index b9dc18c939..cc4c36f796 100644
--- a/src/runtime/netpoll_epoll.go
+++ b/src/runtime/netpoll_epoll.go
@@ -6,7 +6,10 @@
package runtime
-import "unsafe"
+import (
+ "runtime/internal/atomic"
+ "unsafe"
+)
func epollcreate(size int32) int32
func epollcreate1(flags int32) int32
@@ -22,6 +25,8 @@ var (
epfd int32 = -1 // epoll descriptor
netpollBreakRd, netpollBreakWr uintptr // for netpollBreak
+
+ netpollWakeSig uintptr // used to avoid duplicate calls of netpollBreak
)
func netpollinit() {
@@ -74,20 +79,22 @@ func netpollarm(pd *pollDesc, mode int) {
// netpollBreak interrupts an epollwait.
func netpollBreak() {
- for {
- var b byte
- n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
- if n == 1 {
- break
- }
- if n == -_EINTR {
- continue
- }
- if n == -_EAGAIN {
- return
+ if atomic.Casuintptr(&netpollWakeSig, 0, 1) {
+ for {
+ var b byte
+ n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
+ if n == 1 {
+ break
+ }
+ if n == -_EINTR {
+ continue
+ }
+ if n == -_EAGAIN {
+ return
+ }
+ println("runtime: netpollBreak write failed with", -n)
+ throw("runtime: netpollBreak write failed")
}
- println("runtime: netpollBreak write failed with", -n)
- throw("runtime: netpollBreak write failed")
}
}
@@ -147,6 +154,7 @@ retry:
// if blocking.
var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
+ atomic.Storeuintptr(&netpollWakeSig, 0)
}
continue
}
diff --git a/src/runtime/netpoll_kqueue.go b/src/runtime/netpoll_kqueue.go
index 39d402252d..2ff21d8fcb 100644
--- a/src/runtime/netpoll_kqueue.go
+++ b/src/runtime/netpoll_kqueue.go
@@ -8,12 +8,17 @@ package runtime
// Integrated network poller (kqueue-based implementation).
-import "unsafe"
+import (
+ "runtime/internal/atomic"
+ "unsafe"
+)
var (
kq int32 = -1
netpollBreakRd, netpollBreakWr uintptr // for netpollBreak
+
+ netpollWakeSig uintptr // used to avoid duplicate calls of netpollBreak
)
func netpollinit() {
@@ -78,17 +83,19 @@ func netpollarm(pd *pollDesc, mode int) {
// netpollBreak interrupts a kevent.
func netpollBreak() {
- for {
- var b byte
- n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
- if n == 1 || n == -_EAGAIN {
- break
- }
- if n == -_EINTR {
- continue
+ if atomic.Casuintptr(&netpollWakeSig, 0, 1) {
+ for {
+ var b byte
+ n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
+ if n == 1 || n == -_EAGAIN {
+ break
+ }
+ if n == -_EINTR {
+ continue
+ }
+ println("runtime: netpollBreak write failed with", -n)
+ throw("runtime: netpollBreak write failed")
}
- println("runtime: netpollBreak write failed with", -n)
- throw("runtime: netpollBreak write failed")
}
}
@@ -145,6 +152,7 @@ retry:
// if blocking.
var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
+ atomic.Storeuintptr(&netpollWakeSig, 0)
}
continue
}
diff --git a/src/runtime/netpoll_os_test.go b/src/runtime/netpoll_os_test.go
new file mode 100644
index 0000000000..b96b9f3ee3
--- /dev/null
+++ b/src/runtime/netpoll_os_test.go
@@ -0,0 +1,28 @@
+package runtime_test
+
+import (
+ "runtime"
+ "sync"
+ "testing"
+)
+
+var wg sync.WaitGroup
+
+func init() {
+ runtime.NetpollGenericInit()
+}
+
+func BenchmarkNetpollBreak(b *testing.B) {
+ b.StartTimer()
+ for i := 0; i < b.N; i++ {
+ for j := 0; j < 10; j++ {
+ wg.Add(1)
+ go func() {
+ runtime.NetpollBreak()
+ wg.Done()
+ }()
+ }
+ }
+ wg.Wait()
+ b.StopTimer()
+}
diff --git a/src/runtime/netpoll_solaris.go b/src/runtime/netpoll_solaris.go
index 15818cb4ea..34b3ee9308 100644
--- a/src/runtime/netpoll_solaris.go
+++ b/src/runtime/netpoll_solaris.go
@@ -4,7 +4,10 @@
package runtime
-import "unsafe"
+import (
+ "runtime/internal/atomic"
+ "unsafe"
+)
// Solaris runtime-integrated network poller.
//
@@ -85,6 +88,7 @@ var (
libc_port_dissociate,
libc_port_getn,
libc_port_alert libcFunc
+ netpollWakeSig uintptr // used to avoid duplicate calls of netpollBreak
)
func errno() int32 {
@@ -187,15 +191,17 @@ func netpollarm(pd *pollDesc, mode int) {
// netpollBreak interrupts a port_getn wait.
func netpollBreak() {
- // Use port_alert to put portfd into alert mode.
- // This will wake up all threads sleeping in port_getn on portfd,
- // and cause their calls to port_getn to return immediately.
- // Further, until portfd is taken out of alert mode,
- // all calls to port_getn will return immediately.
- if port_alert(portfd, _PORT_ALERT_UPDATE, _POLLHUP, uintptr(unsafe.Pointer(&portfd))) < 0 {
- if e := errno(); e != _EBUSY {
- println("runtime: port_alert failed with", e)
- throw("runtime: netpoll: port_alert failed")
+ if atomic.Casuintptr(&netpollWakeSig, 0, 1) {
+ // Use port_alert to put portfd into alert mode.
+ // This will wake up all threads sleeping in port_getn on portfd,
+ // and cause their calls to port_getn to return immediately.
+ // Further, until portfd is taken out of alert mode,
+ // all calls to port_getn will return immediately.
+ if port_alert(portfd, _PORT_ALERT_UPDATE, _POLLHUP, uintptr(unsafe.Pointer(&portfd))) < 0 {
+ if e := errno(); e != _EBUSY {
+ println("runtime: port_alert failed with", e)
+ throw("runtime: netpoll: port_alert failed")
+ }
}
}
}
@@ -268,6 +274,7 @@ retry:
println("runtime: port_alert failed with", e)
throw("runtime: netpoll: port_alert failed")
}
+ atomic.Storeuintptr(&netpollWakeSig, 0)
}
continue
}
diff --git a/src/runtime/netpoll_windows.go b/src/runtime/netpoll_windows.go
index 28b2f6ef3b..56a0798559 100644
--- a/src/runtime/netpoll_windows.go
+++ b/src/runtime/netpoll_windows.go
@@ -5,6 +5,7 @@
package runtime
import (
+ "runtime/internal/atomic"
"unsafe"
)
@@ -31,7 +32,11 @@ type overlappedEntry struct {
qty uint32
}
-var iocphandle uintptr = _INVALID_HANDLE_VALUE // completion port io handle
+var (
+ iocphandle uintptr = _INVALID_HANDLE_VALUE // completion port io handle
+
+ netpollWakeSig uintptr // used to avoid duplicate calls of netpollBreak
+)
func netpollinit() {
iocphandle = stdcall4(_CreateIoCompletionPort, _INVALID_HANDLE_VALUE, 0, 0, _DWORD_MAX)
@@ -62,9 +67,11 @@ func netpollarm(pd *pollDesc, mode int) {
}
func netpollBreak() {
- if stdcall4(_PostQueuedCompletionStatus, iocphandle, 0, 0, 0) == 0 {
- println("runtime: netpoll: PostQueuedCompletionStatus failed (errno=", getlasterror(), ")")
- throw("runtime: netpoll: PostQueuedCompletionStatus failed")
+ if atomic.Casuintptr(&netpollWakeSig, 0, 1) {
+ if stdcall4(_PostQueuedCompletionStatus, iocphandle, 0, 0, 0) == 0 {
+ println("runtime: netpoll: PostQueuedCompletionStatus failed (errno=", getlasterror(), ")")
+ throw("runtime: netpoll: PostQueuedCompletionStatus failed")
+ }
}
}
@@ -126,6 +133,7 @@ func netpoll(delay int64) gList {
}
handlecompletion(&toRun, op, errno, qty)
} else {
+ atomic.Storeuintptr(&netpollWakeSig, 0)
if delay == 0 {
// Forward the notification to the
// blocked poller.