1
0
mirror of https://github.com/golang/go synced 2024-09-29 17:24:34 -06:00

runtime: converge duplicate calls to netpollBreak into one

There might be some concurrent (maybe not concurrent, just sequential but in a short time window) and duplicate calls to `netpollBreak`, trying to wake up a net-poller. If one has called `netpollBreak` and that waking event hasn't been received by epollwait/kevent/..., then the subsequent calls of `netpollBreak` ought to be ignored or in other words, these calls should be converged into one.

Benchmarks go1.13.5 darwin/amd64:

benchmark-func           time/op (old)  time/op (new)  delta
BenchmarkNetpollBreak-4  29668ns ±1%    3131ns ±2%     -89.45%

mem/B (old)  mem/B (new)  delta
154B ±13%    0B ±0%       -100%

Change-Id: I3cf757a5d6edc5a99adad7aea3baee4b7f2a8f5c
GitHub-Last-Rev: 15bcfbab8a
GitHub-Pull-Request: golang/go#36294
Reviewed-on: https://go-review.googlesource.com/c/go/+/212737
Run-TryBot: Emmanuel Odeke <emm.odeke@gmail.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Ian Lance Taylor <iant@golang.org>
This commit is contained in:
Andy Pan 2020-03-27 03:21:17 +00:00 committed by Ian Lance Taylor
parent e4a1cf8a56
commit 0cc1290174
5 changed files with 98 additions and 39 deletions

View File

@ -6,7 +6,10 @@
package runtime package runtime
import "unsafe" import (
"runtime/internal/atomic"
"unsafe"
)
func epollcreate(size int32) int32 func epollcreate(size int32) int32
func epollcreate1(flags int32) int32 func epollcreate1(flags int32) int32
@ -22,6 +25,8 @@ var (
epfd int32 = -1 // epoll descriptor epfd int32 = -1 // epoll descriptor
netpollBreakRd, netpollBreakWr uintptr // for netpollBreak netpollBreakRd, netpollBreakWr uintptr // for netpollBreak
netpollWakeSig uintptr // used to avoid duplicate calls of netpollBreak
) )
func netpollinit() { func netpollinit() {
@ -74,20 +79,22 @@ func netpollarm(pd *pollDesc, mode int) {
// netpollBreak interrupts an epollwait. // netpollBreak interrupts an epollwait.
func netpollBreak() { func netpollBreak() {
for { if atomic.Casuintptr(&netpollWakeSig, 0, 1) {
var b byte for {
n := write(netpollBreakWr, unsafe.Pointer(&b), 1) var b byte
if n == 1 { n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
break if n == 1 {
break
}
if n == -_EINTR {
continue
}
if n == -_EAGAIN {
return
}
println("runtime: netpollBreak write failed with", -n)
throw("runtime: netpollBreak write failed")
} }
if n == -_EINTR {
continue
}
if n == -_EAGAIN {
return
}
println("runtime: netpollBreak write failed with", -n)
throw("runtime: netpollBreak write failed")
} }
} }
@ -147,6 +154,7 @@ retry:
// if blocking. // if blocking.
var tmp [16]byte var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp))) read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
atomic.Storeuintptr(&netpollWakeSig, 0)
} }
continue continue
} }

View File

@ -8,12 +8,17 @@ package runtime
// Integrated network poller (kqueue-based implementation). // Integrated network poller (kqueue-based implementation).
import "unsafe" import (
"runtime/internal/atomic"
"unsafe"
)
var ( var (
kq int32 = -1 kq int32 = -1
netpollBreakRd, netpollBreakWr uintptr // for netpollBreak netpollBreakRd, netpollBreakWr uintptr // for netpollBreak
netpollWakeSig uintptr // used to avoid duplicate calls of netpollBreak
) )
func netpollinit() { func netpollinit() {
@ -78,17 +83,19 @@ func netpollarm(pd *pollDesc, mode int) {
// netpollBreak interrupts a kevent. // netpollBreak interrupts a kevent.
func netpollBreak() { func netpollBreak() {
for { if atomic.Casuintptr(&netpollWakeSig, 0, 1) {
var b byte for {
n := write(netpollBreakWr, unsafe.Pointer(&b), 1) var b byte
if n == 1 || n == -_EAGAIN { n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
break if n == 1 || n == -_EAGAIN {
break
}
if n == -_EINTR {
continue
}
println("runtime: netpollBreak write failed with", -n)
throw("runtime: netpollBreak write failed")
} }
if n == -_EINTR {
continue
}
println("runtime: netpollBreak write failed with", -n)
throw("runtime: netpollBreak write failed")
} }
} }
@ -145,6 +152,7 @@ retry:
// if blocking. // if blocking.
var tmp [16]byte var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp))) read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
atomic.Storeuintptr(&netpollWakeSig, 0)
} }
continue continue
} }

View File

@ -0,0 +1,28 @@
package runtime_test
import (
"runtime"
"sync"
"testing"
)
var wg sync.WaitGroup
func init() {
runtime.NetpollGenericInit()
}
func BenchmarkNetpollBreak(b *testing.B) {
b.StartTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < 10; j++ {
wg.Add(1)
go func() {
runtime.NetpollBreak()
wg.Done()
}()
}
}
wg.Wait()
b.StopTimer()
}

View File

@ -4,7 +4,10 @@
package runtime package runtime
import "unsafe" import (
"runtime/internal/atomic"
"unsafe"
)
// Solaris runtime-integrated network poller. // Solaris runtime-integrated network poller.
// //
@ -85,6 +88,7 @@ var (
libc_port_dissociate, libc_port_dissociate,
libc_port_getn, libc_port_getn,
libc_port_alert libcFunc libc_port_alert libcFunc
netpollWakeSig uintptr // used to avoid duplicate calls of netpollBreak
) )
func errno() int32 { func errno() int32 {
@ -187,15 +191,17 @@ func netpollarm(pd *pollDesc, mode int) {
// netpollBreak interrupts a port_getn wait. // netpollBreak interrupts a port_getn wait.
func netpollBreak() { func netpollBreak() {
// Use port_alert to put portfd into alert mode. if atomic.Casuintptr(&netpollWakeSig, 0, 1) {
// This will wake up all threads sleeping in port_getn on portfd, // Use port_alert to put portfd into alert mode.
// and cause their calls to port_getn to return immediately. // This will wake up all threads sleeping in port_getn on portfd,
// Further, until portfd is taken out of alert mode, // and cause their calls to port_getn to return immediately.
// all calls to port_getn will return immediately. // Further, until portfd is taken out of alert mode,
if port_alert(portfd, _PORT_ALERT_UPDATE, _POLLHUP, uintptr(unsafe.Pointer(&portfd))) < 0 { // all calls to port_getn will return immediately.
if e := errno(); e != _EBUSY { if port_alert(portfd, _PORT_ALERT_UPDATE, _POLLHUP, uintptr(unsafe.Pointer(&portfd))) < 0 {
println("runtime: port_alert failed with", e) if e := errno(); e != _EBUSY {
throw("runtime: netpoll: port_alert failed") println("runtime: port_alert failed with", e)
throw("runtime: netpoll: port_alert failed")
}
} }
} }
} }
@ -268,6 +274,7 @@ retry:
println("runtime: port_alert failed with", e) println("runtime: port_alert failed with", e)
throw("runtime: netpoll: port_alert failed") throw("runtime: netpoll: port_alert failed")
} }
atomic.Storeuintptr(&netpollWakeSig, 0)
} }
continue continue
} }

View File

@ -5,6 +5,7 @@
package runtime package runtime
import ( import (
"runtime/internal/atomic"
"unsafe" "unsafe"
) )
@ -31,7 +32,11 @@ type overlappedEntry struct {
qty uint32 qty uint32
} }
var iocphandle uintptr = _INVALID_HANDLE_VALUE // completion port io handle var (
iocphandle uintptr = _INVALID_HANDLE_VALUE // completion port io handle
netpollWakeSig uintptr // used to avoid duplicate calls of netpollBreak
)
func netpollinit() { func netpollinit() {
iocphandle = stdcall4(_CreateIoCompletionPort, _INVALID_HANDLE_VALUE, 0, 0, _DWORD_MAX) iocphandle = stdcall4(_CreateIoCompletionPort, _INVALID_HANDLE_VALUE, 0, 0, _DWORD_MAX)
@ -62,9 +67,11 @@ func netpollarm(pd *pollDesc, mode int) {
} }
func netpollBreak() { func netpollBreak() {
if stdcall4(_PostQueuedCompletionStatus, iocphandle, 0, 0, 0) == 0 { if atomic.Casuintptr(&netpollWakeSig, 0, 1) {
println("runtime: netpoll: PostQueuedCompletionStatus failed (errno=", getlasterror(), ")") if stdcall4(_PostQueuedCompletionStatus, iocphandle, 0, 0, 0) == 0 {
throw("runtime: netpoll: PostQueuedCompletionStatus failed") println("runtime: netpoll: PostQueuedCompletionStatus failed (errno=", getlasterror(), ")")
throw("runtime: netpoll: PostQueuedCompletionStatus failed")
}
} }
} }
@ -126,6 +133,7 @@ func netpoll(delay int64) gList {
} }
handlecompletion(&toRun, op, errno, qty) handlecompletion(&toRun, op, errno, qty)
} else { } else {
atomic.Storeuintptr(&netpollWakeSig, 0)
if delay == 0 { if delay == 0 {
// Forward the notification to the // Forward the notification to the
// blocked poller. // blocked poller.