From 0cc1290174751971d282196e21ec9037b217e5a5 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Fri, 27 Mar 2020 03:21:17 +0000 Subject: [PATCH] runtime: converge duplicate calls to netpollBreak into one MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit There might be some concurrent (maybe not concurrent, just sequential but in a short time window) and duplicate calls to `netpollBreak`, trying to wake up a net-poller. If one has called `netpollBreak` and that waking event hasn't been received by epollwait/kevent/..., then the subsequent calls of `netpollBreak` ought to be ignored or in other words, these calls should be converged into one. Benchmarks go1.13.5 darwin/amd64: benchmark-func time/op (old) time/op (new) delta BenchmarkNetpollBreak-4 29668ns ±1% 3131ns ±2% -89.45% mem/B (old) mem/B (new) delta 154B ±13% 0B ±0% -100% Change-Id: I3cf757a5d6edc5a99adad7aea3baee4b7f2a8f5c GitHub-Last-Rev: 15bcfbab8a5db51f65da01315a5880a5dbf9e028 GitHub-Pull-Request: golang/go#36294 Reviewed-on: https://go-review.googlesource.com/c/go/+/212737 Run-TryBot: Emmanuel Odeke TryBot-Result: Gobot Gobot Reviewed-by: Ian Lance Taylor --- src/runtime/netpoll_epoll.go | 36 +++++++++++++++++++++------------- src/runtime/netpoll_kqueue.go | 30 +++++++++++++++++----------- src/runtime/netpoll_os_test.go | 28 ++++++++++++++++++++++++++ src/runtime/netpoll_solaris.go | 27 +++++++++++++++---------- src/runtime/netpoll_windows.go | 16 +++++++++++---- 5 files changed, 98 insertions(+), 39 deletions(-) create mode 100644 src/runtime/netpoll_os_test.go diff --git a/src/runtime/netpoll_epoll.go b/src/runtime/netpoll_epoll.go index b9dc18c939..cc4c36f796 100644 --- a/src/runtime/netpoll_epoll.go +++ b/src/runtime/netpoll_epoll.go @@ -6,7 +6,10 @@ package runtime -import "unsafe" +import ( + "runtime/internal/atomic" + "unsafe" +) func epollcreate(size int32) int32 func epollcreate1(flags int32) int32 @@ -22,6 +25,8 @@ var ( epfd int32 = -1 // epoll descriptor netpollBreakRd, netpollBreakWr uintptr // for netpollBreak + + netpollWakeSig uintptr // used to avoid duplicate calls of netpollBreak ) func netpollinit() { @@ -74,20 +79,22 @@ func netpollarm(pd *pollDesc, mode int) { // netpollBreak interrupts an epollwait. func netpollBreak() { - for { - var b byte - n := write(netpollBreakWr, unsafe.Pointer(&b), 1) - if n == 1 { - break + if atomic.Casuintptr(&netpollWakeSig, 0, 1) { + for { + var b byte + n := write(netpollBreakWr, unsafe.Pointer(&b), 1) + if n == 1 { + break + } + if n == -_EINTR { + continue + } + if n == -_EAGAIN { + return + } + println("runtime: netpollBreak write failed with", -n) + throw("runtime: netpollBreak write failed") } - if n == -_EINTR { - continue - } - if n == -_EAGAIN { - return - } - println("runtime: netpollBreak write failed with", -n) - throw("runtime: netpollBreak write failed") } } @@ -147,6 +154,7 @@ retry: // if blocking. var tmp [16]byte read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp))) + atomic.Storeuintptr(&netpollWakeSig, 0) } continue } diff --git a/src/runtime/netpoll_kqueue.go b/src/runtime/netpoll_kqueue.go index 39d402252d..2ff21d8fcb 100644 --- a/src/runtime/netpoll_kqueue.go +++ b/src/runtime/netpoll_kqueue.go @@ -8,12 +8,17 @@ package runtime // Integrated network poller (kqueue-based implementation). -import "unsafe" +import ( + "runtime/internal/atomic" + "unsafe" +) var ( kq int32 = -1 netpollBreakRd, netpollBreakWr uintptr // for netpollBreak + + netpollWakeSig uintptr // used to avoid duplicate calls of netpollBreak ) func netpollinit() { @@ -78,17 +83,19 @@ func netpollarm(pd *pollDesc, mode int) { // netpollBreak interrupts a kevent. func netpollBreak() { - for { - var b byte - n := write(netpollBreakWr, unsafe.Pointer(&b), 1) - if n == 1 || n == -_EAGAIN { - break + if atomic.Casuintptr(&netpollWakeSig, 0, 1) { + for { + var b byte + n := write(netpollBreakWr, unsafe.Pointer(&b), 1) + if n == 1 || n == -_EAGAIN { + break + } + if n == -_EINTR { + continue + } + println("runtime: netpollBreak write failed with", -n) + throw("runtime: netpollBreak write failed") } - if n == -_EINTR { - continue - } - println("runtime: netpollBreak write failed with", -n) - throw("runtime: netpollBreak write failed") } } @@ -145,6 +152,7 @@ retry: // if blocking. var tmp [16]byte read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp))) + atomic.Storeuintptr(&netpollWakeSig, 0) } continue } diff --git a/src/runtime/netpoll_os_test.go b/src/runtime/netpoll_os_test.go new file mode 100644 index 0000000000..b96b9f3ee3 --- /dev/null +++ b/src/runtime/netpoll_os_test.go @@ -0,0 +1,28 @@ +package runtime_test + +import ( + "runtime" + "sync" + "testing" +) + +var wg sync.WaitGroup + +func init() { + runtime.NetpollGenericInit() +} + +func BenchmarkNetpollBreak(b *testing.B) { + b.StartTimer() + for i := 0; i < b.N; i++ { + for j := 0; j < 10; j++ { + wg.Add(1) + go func() { + runtime.NetpollBreak() + wg.Done() + }() + } + } + wg.Wait() + b.StopTimer() +} diff --git a/src/runtime/netpoll_solaris.go b/src/runtime/netpoll_solaris.go index 15818cb4ea..34b3ee9308 100644 --- a/src/runtime/netpoll_solaris.go +++ b/src/runtime/netpoll_solaris.go @@ -4,7 +4,10 @@ package runtime -import "unsafe" +import ( + "runtime/internal/atomic" + "unsafe" +) // Solaris runtime-integrated network poller. // @@ -85,6 +88,7 @@ var ( libc_port_dissociate, libc_port_getn, libc_port_alert libcFunc + netpollWakeSig uintptr // used to avoid duplicate calls of netpollBreak ) func errno() int32 { @@ -187,15 +191,17 @@ func netpollarm(pd *pollDesc, mode int) { // netpollBreak interrupts a port_getn wait. func netpollBreak() { - // Use port_alert to put portfd into alert mode. - // This will wake up all threads sleeping in port_getn on portfd, - // and cause their calls to port_getn to return immediately. - // Further, until portfd is taken out of alert mode, - // all calls to port_getn will return immediately. - if port_alert(portfd, _PORT_ALERT_UPDATE, _POLLHUP, uintptr(unsafe.Pointer(&portfd))) < 0 { - if e := errno(); e != _EBUSY { - println("runtime: port_alert failed with", e) - throw("runtime: netpoll: port_alert failed") + if atomic.Casuintptr(&netpollWakeSig, 0, 1) { + // Use port_alert to put portfd into alert mode. + // This will wake up all threads sleeping in port_getn on portfd, + // and cause their calls to port_getn to return immediately. + // Further, until portfd is taken out of alert mode, + // all calls to port_getn will return immediately. + if port_alert(portfd, _PORT_ALERT_UPDATE, _POLLHUP, uintptr(unsafe.Pointer(&portfd))) < 0 { + if e := errno(); e != _EBUSY { + println("runtime: port_alert failed with", e) + throw("runtime: netpoll: port_alert failed") + } } } } @@ -268,6 +274,7 @@ retry: println("runtime: port_alert failed with", e) throw("runtime: netpoll: port_alert failed") } + atomic.Storeuintptr(&netpollWakeSig, 0) } continue } diff --git a/src/runtime/netpoll_windows.go b/src/runtime/netpoll_windows.go index 28b2f6ef3b..56a0798559 100644 --- a/src/runtime/netpoll_windows.go +++ b/src/runtime/netpoll_windows.go @@ -5,6 +5,7 @@ package runtime import ( + "runtime/internal/atomic" "unsafe" ) @@ -31,7 +32,11 @@ type overlappedEntry struct { qty uint32 } -var iocphandle uintptr = _INVALID_HANDLE_VALUE // completion port io handle +var ( + iocphandle uintptr = _INVALID_HANDLE_VALUE // completion port io handle + + netpollWakeSig uintptr // used to avoid duplicate calls of netpollBreak +) func netpollinit() { iocphandle = stdcall4(_CreateIoCompletionPort, _INVALID_HANDLE_VALUE, 0, 0, _DWORD_MAX) @@ -62,9 +67,11 @@ func netpollarm(pd *pollDesc, mode int) { } func netpollBreak() { - if stdcall4(_PostQueuedCompletionStatus, iocphandle, 0, 0, 0) == 0 { - println("runtime: netpoll: PostQueuedCompletionStatus failed (errno=", getlasterror(), ")") - throw("runtime: netpoll: PostQueuedCompletionStatus failed") + if atomic.Casuintptr(&netpollWakeSig, 0, 1) { + if stdcall4(_PostQueuedCompletionStatus, iocphandle, 0, 0, 0) == 0 { + println("runtime: netpoll: PostQueuedCompletionStatus failed (errno=", getlasterror(), ")") + throw("runtime: netpoll: PostQueuedCompletionStatus failed") + } } } @@ -126,6 +133,7 @@ func netpoll(delay int64) gList { } handlecompletion(&toRun, op, errno, qty) } else { + atomic.Storeuintptr(&netpollWakeSig, 0) if delay == 0 { // Forward the notification to the // blocked poller.