From 50f4896b72d16b6538178c8ca851b20655075b7f Mon Sep 17 00:00:00 2001 From: Ian Lance Taylor Date: Fri, 5 Apr 2019 15:53:12 -0700 Subject: [PATCH] runtime: add netpollBreak The new netpollBreak function can be used to interrupt a blocking netpoll. This function is not currently used; it will be used by later CLs. Updates #27707 Change-Id: I5cb936609ba13c3c127ea1368a49194fc58c9f4d Reviewed-on: https://go-review.googlesource.com/c/go/+/171824 Run-TryBot: Ian Lance Taylor Reviewed-by: Michael Knyszek --- src/runtime/defs1_netbsd_386.go | 1 + src/runtime/defs1_netbsd_amd64.go | 1 + src/runtime/defs1_netbsd_arm.go | 1 + src/runtime/defs1_netbsd_arm64.go | 1 + src/runtime/defs1_solaris_amd64.go | 5 ++- src/runtime/defs_darwin.go | 1 + src/runtime/defs_darwin_386.go | 1 + src/runtime/defs_darwin_amd64.go | 1 + src/runtime/defs_darwin_arm.go | 1 + src/runtime/defs_darwin_arm64.go | 1 + src/runtime/defs_freebsd.go | 1 + src/runtime/defs_freebsd_386.go | 1 + src/runtime/defs_freebsd_amd64.go | 1 + src/runtime/defs_freebsd_arm.go | 1 + src/runtime/defs_netbsd.go | 1 + src/runtime/defs_openbsd.go | 1 + src/runtime/defs_openbsd_386.go | 1 + src/runtime/defs_openbsd_amd64.go | 1 + src/runtime/defs_openbsd_arm.go | 1 + src/runtime/defs_openbsd_arm64.go | 1 + src/runtime/defs_solaris.go | 5 ++- src/runtime/export_test.go | 3 ++ src/runtime/netpoll.go | 44 +++++++++++------- src/runtime/netpoll_aix.go | 22 +++++---- src/runtime/netpoll_epoll.go | 71 +++++++++++++++++++++++++----- src/runtime/netpoll_fake.go | 7 ++- src/runtime/netpoll_kqueue.go | 55 ++++++++++++++++++++++- src/runtime/netpoll_solaris.go | 46 +++++++++++++++++-- src/runtime/netpoll_stub.go | 17 +++++++ src/runtime/netpoll_windows.go | 37 +++++++++++++--- src/runtime/os_netbsd.go | 2 - src/runtime/os_openbsd.go | 1 - src/runtime/os_windows.go | 2 + src/runtime/proc_test.go | 39 ++++++++++++++++ 34 files changed, 322 insertions(+), 53 deletions(-) diff --git a/src/runtime/defs1_netbsd_386.go b/src/runtime/defs1_netbsd_386.go index da48cc84c27..a4548e6f062 100644 --- a/src/runtime/defs1_netbsd_386.go +++ b/src/runtime/defs1_netbsd_386.go @@ -6,6 +6,7 @@ package runtime const ( _EINTR = 0x4 _EFAULT = 0xe + _EAGAIN = 0x23 _ENOSYS = 0x4e _O_NONBLOCK = 0x4 diff --git a/src/runtime/defs1_netbsd_amd64.go b/src/runtime/defs1_netbsd_amd64.go index 0b25b8da7c4..4b0e79ebb6f 100644 --- a/src/runtime/defs1_netbsd_amd64.go +++ b/src/runtime/defs1_netbsd_amd64.go @@ -6,6 +6,7 @@ package runtime const ( _EINTR = 0x4 _EFAULT = 0xe + _EAGAIN = 0x23 _ENOSYS = 0x4e _O_NONBLOCK = 0x4 diff --git a/src/runtime/defs1_netbsd_arm.go b/src/runtime/defs1_netbsd_arm.go index 4738b546d17..2b5d5990d36 100644 --- a/src/runtime/defs1_netbsd_arm.go +++ b/src/runtime/defs1_netbsd_arm.go @@ -6,6 +6,7 @@ package runtime const ( _EINTR = 0x4 _EFAULT = 0xe + _EAGAIN = 0x23 _ENOSYS = 0x4e _O_NONBLOCK = 0x4 diff --git a/src/runtime/defs1_netbsd_arm64.go b/src/runtime/defs1_netbsd_arm64.go index 14c07d17040..740dc77658f 100644 --- a/src/runtime/defs1_netbsd_arm64.go +++ b/src/runtime/defs1_netbsd_arm64.go @@ -6,6 +6,7 @@ package runtime const ( _EINTR = 0x4 _EFAULT = 0xe + _EAGAIN = 0x23 _ENOSYS = 0x4e _O_NONBLOCK = 0x4 diff --git a/src/runtime/defs1_solaris_amd64.go b/src/runtime/defs1_solaris_amd64.go index 14b5c7949e4..ee6c45e5242 100644 --- a/src/runtime/defs1_solaris_amd64.go +++ b/src/runtime/defs1_solaris_amd64.go @@ -8,6 +8,7 @@ const ( _EBADF = 0x9 _EFAULT = 0xe _EAGAIN = 0xb + _EBUSY = 0x10 _ETIME = 0x3e _ETIMEDOUT = 0x91 _EWOULDBLOCK = 0xb @@ -100,7 +101,9 @@ const ( _POLLHUP = 0x10 _POLLERR = 0x8 - _PORT_SOURCE_FD = 0x4 + _PORT_SOURCE_FD = 0x4 + _PORT_SOURCE_ALERT = 0x5 + _PORT_ALERT_UPDATE = 0x2 ) type semt struct { diff --git a/src/runtime/defs_darwin.go b/src/runtime/defs_darwin.go index 0cd133f6e02..de1489f0326 100644 --- a/src/runtime/defs_darwin.go +++ b/src/runtime/defs_darwin.go @@ -30,6 +30,7 @@ import "C" const ( EINTR = C.EINTR EFAULT = C.EFAULT + EAGAIN = C.EAGAIN ETIMEDOUT = C.ETIMEDOUT PROT_NONE = C.PROT_NONE diff --git a/src/runtime/defs_darwin_386.go b/src/runtime/defs_darwin_386.go index 83928e78415..a78f54bcf56 100644 --- a/src/runtime/defs_darwin_386.go +++ b/src/runtime/defs_darwin_386.go @@ -8,6 +8,7 @@ import "unsafe" const ( _EINTR = 0x4 _EFAULT = 0xe + _EAGAIN = 0x23 _ETIMEDOUT = 0x3c _PROT_NONE = 0x0 diff --git a/src/runtime/defs_darwin_amd64.go b/src/runtime/defs_darwin_amd64.go index 45c34a8fc04..cbc26bfcffa 100644 --- a/src/runtime/defs_darwin_amd64.go +++ b/src/runtime/defs_darwin_amd64.go @@ -8,6 +8,7 @@ import "unsafe" const ( _EINTR = 0x4 _EFAULT = 0xe + _EAGAIN = 0x23 _ETIMEDOUT = 0x3c _PROT_NONE = 0x0 diff --git a/src/runtime/defs_darwin_arm.go b/src/runtime/defs_darwin_arm.go index 5e2af978a7b..199886aad1f 100644 --- a/src/runtime/defs_darwin_arm.go +++ b/src/runtime/defs_darwin_arm.go @@ -10,6 +10,7 @@ import "unsafe" const ( _EINTR = 0x4 _EFAULT = 0xe + _EAGAIN = 0x23 _ETIMEDOUT = 0x3c _PROT_NONE = 0x0 diff --git a/src/runtime/defs_darwin_arm64.go b/src/runtime/defs_darwin_arm64.go index f673eb7b24b..2f466045d4f 100644 --- a/src/runtime/defs_darwin_arm64.go +++ b/src/runtime/defs_darwin_arm64.go @@ -8,6 +8,7 @@ import "unsafe" const ( _EINTR = 0x4 _EFAULT = 0xe + _EAGAIN = 0x23 _ETIMEDOUT = 0x3c _PROT_NONE = 0x0 diff --git a/src/runtime/defs_freebsd.go b/src/runtime/defs_freebsd.go index 700e06eb809..e196dff0769 100644 --- a/src/runtime/defs_freebsd.go +++ b/src/runtime/defs_freebsd.go @@ -47,6 +47,7 @@ const ( const ( EINTR = C.EINTR EFAULT = C.EFAULT + EAGAIN = C.EAGAIN ENOSYS = C.ENOSYS O_NONBLOCK = C.O_NONBLOCK diff --git a/src/runtime/defs_freebsd_386.go b/src/runtime/defs_freebsd_386.go index c113eee34c8..6294fc32d4c 100644 --- a/src/runtime/defs_freebsd_386.go +++ b/src/runtime/defs_freebsd_386.go @@ -15,6 +15,7 @@ const ( const ( _EINTR = 0x4 _EFAULT = 0xe + _EAGAIN = 0x23 _ENOSYS = 0x4e _O_NONBLOCK = 0x4 diff --git a/src/runtime/defs_freebsd_amd64.go b/src/runtime/defs_freebsd_amd64.go index 9105cc392b1..840c710eeb9 100644 --- a/src/runtime/defs_freebsd_amd64.go +++ b/src/runtime/defs_freebsd_amd64.go @@ -15,6 +15,7 @@ const ( const ( _EINTR = 0x4 _EFAULT = 0xe + _EAGAIN = 0x23 _ENOSYS = 0x4e _O_NONBLOCK = 0x4 diff --git a/src/runtime/defs_freebsd_arm.go b/src/runtime/defs_freebsd_arm.go index cf7ca696f4f..3307c8bbae3 100644 --- a/src/runtime/defs_freebsd_arm.go +++ b/src/runtime/defs_freebsd_arm.go @@ -15,6 +15,7 @@ const ( const ( _EINTR = 0x4 _EFAULT = 0xe + _EAGAIN = 0x23 _ENOSYS = 0x4e _O_NONBLOCK = 0x4 diff --git a/src/runtime/defs_netbsd.go b/src/runtime/defs_netbsd.go index 40eeb8c70f0..3f5ce5adcac 100644 --- a/src/runtime/defs_netbsd.go +++ b/src/runtime/defs_netbsd.go @@ -32,6 +32,7 @@ import "C" const ( EINTR = C.EINTR EFAULT = C.EFAULT + EAGAIN = C.EAGAIN ENOSYS = C.ENOSYS O_NONBLOCK = C.O_NONBLOCK diff --git a/src/runtime/defs_openbsd.go b/src/runtime/defs_openbsd.go index c425864b214..4774e36c923 100644 --- a/src/runtime/defs_openbsd.go +++ b/src/runtime/defs_openbsd.go @@ -28,6 +28,7 @@ import "C" const ( EINTR = C.EINTR EFAULT = C.EFAULT + EAGAIN = C.EAGAIN ENOSYS = C.ENOSYS O_NONBLOCK = C.O_NONBLOCK diff --git a/src/runtime/defs_openbsd_386.go b/src/runtime/defs_openbsd_386.go index 0c89bf28ccf..35f2e53fcf0 100644 --- a/src/runtime/defs_openbsd_386.go +++ b/src/runtime/defs_openbsd_386.go @@ -8,6 +8,7 @@ import "unsafe" const ( _EINTR = 0x4 _EFAULT = 0xe + _EAGAIN = 0x23 _ENOSYS = 0x4e _O_NONBLOCK = 0x4 diff --git a/src/runtime/defs_openbsd_amd64.go b/src/runtime/defs_openbsd_amd64.go index 8d3523bf98c..c187a98ae0c 100644 --- a/src/runtime/defs_openbsd_amd64.go +++ b/src/runtime/defs_openbsd_amd64.go @@ -8,6 +8,7 @@ import "unsafe" const ( _EINTR = 0x4 _EFAULT = 0xe + _EAGAIN = 0x23 _ENOSYS = 0x4e _O_NONBLOCK = 0x4 diff --git a/src/runtime/defs_openbsd_arm.go b/src/runtime/defs_openbsd_arm.go index 8f5a5d6e9de..170bb3876c4 100644 --- a/src/runtime/defs_openbsd_arm.go +++ b/src/runtime/defs_openbsd_arm.go @@ -8,6 +8,7 @@ import "unsafe" const ( _EINTR = 0x4 _EFAULT = 0xe + _EAGAIN = 0x23 _ENOSYS = 0x4e _O_NONBLOCK = 0x4 diff --git a/src/runtime/defs_openbsd_arm64.go b/src/runtime/defs_openbsd_arm64.go index c4ddefbd74d..8b8d5cddf2e 100644 --- a/src/runtime/defs_openbsd_arm64.go +++ b/src/runtime/defs_openbsd_arm64.go @@ -5,6 +5,7 @@ import "unsafe" const ( _EINTR = 0x4 _EFAULT = 0xe + _EAGAIN = 0x23 _ENOSYS = 0x4e _O_NONBLOCK = 0x4 diff --git a/src/runtime/defs_solaris.go b/src/runtime/defs_solaris.go index b8ef12a1459..f42adebee37 100644 --- a/src/runtime/defs_solaris.go +++ b/src/runtime/defs_solaris.go @@ -38,6 +38,7 @@ const ( EBADF = C.EBADF EFAULT = C.EFAULT EAGAIN = C.EAGAIN + EBUSY = C.EBUSY ETIME = C.ETIME ETIMEDOUT = C.ETIMEDOUT EWOULDBLOCK = C.EWOULDBLOCK @@ -129,7 +130,9 @@ const ( POLLHUP = C.POLLHUP POLLERR = C.POLLERR - PORT_SOURCE_FD = C.PORT_SOURCE_FD + PORT_SOURCE_FD = C.PORT_SOURCE_FD + PORT_SOURCE_ALERT = C.PORT_SOURCE_ALERT + PORT_ALERT_UPDATE = C.PORT_ALERT_UPDATE ) type SemT C.sem_t diff --git a/src/runtime/export_test.go b/src/runtime/export_test.go index e4a7faf9659..42a456c7079 100644 --- a/src/runtime/export_test.go +++ b/src/runtime/export_test.go @@ -35,6 +35,9 @@ var Atoi = atoi var Atoi32 = atoi32 var Nanotime = nanotime +var Netpoll = netpoll +var NetpollBreak = netpollBreak +var Usleep = usleep var PhysHugePageSize = physHugePageSize diff --git a/src/runtime/netpoll.go b/src/runtime/netpoll.go index adb072db38b..7d18dcaeeab 100644 --- a/src/runtime/netpoll.go +++ b/src/runtime/netpoll.go @@ -12,12 +12,26 @@ import ( ) // Integrated network poller (platform-independent part). -// A particular implementation (epoll/kqueue) must define the following functions: -// func netpollinit() // to initialize the poller -// func netpollopen(fd uintptr, pd *pollDesc) int32 // to arm edge-triggered notifications -// and associate fd with pd. -// An implementation must call the following function to denote that the pd is ready. -// func netpollready(gpp **g, pd *pollDesc, mode int32) +// A particular implementation (epoll/kqueue/port/AIX/Windows) +// must define the following functions: +// +// func netpollinit() +// Initialize the poller. Only called once. +// +// func netpollopen(fd uintptr, pd *pollDesc) int32 +// Arm edge-triggered notifications for fd. The pd argument is to pass +// back to netpollready when fd is ready. Return an errno value. +// +// func netpoll(delta int64) gList +// Poll the network. If delta < 0, block indefinitely. If delta == 0, +// poll without blocking. If delta > 0, block for up to delta nanoseconds. +// Return a list of goroutines built by calling netpollready. +// +// func netpollBreak() +// Wake up the network poller, assumed to be blocked in netpoll. +// +// func netpollIsPollDescriptor(fd uintptr) bool +// Reports whether fd is a file descriptor used by the poller. // pollDesc contains 2 binary semaphores, rg and wg, to park reader and writer // goroutines respectively. The semaphore can be in the following states: @@ -99,14 +113,7 @@ func netpollinited() bool { // poll_runtime_isPollServerDescriptor reports whether fd is a // descriptor being used by netpoll. func poll_runtime_isPollServerDescriptor(fd uintptr) bool { - fds := netpolldescriptor() - if GOOS != "aix" { - return fd == fds - } else { - // AIX have a pipe in its netpoll implementation. - // Therefore, two fd are returned by netpolldescriptor using a mask. - return fd == fds&0xFFFF || fd == (fds>>16)&0xFFFF - } + return netpollIsPollDescriptor(fd) } //go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen @@ -316,8 +323,13 @@ func poll_runtime_pollUnblock(pd *pollDesc) { } } -// make pd ready, newly runnable goroutines (if any) are added to toRun. -// May run during STW, so write barriers are not allowed. +// netpollready is called by the platform-specific netpoll function. +// It declares that the fd associated with pd is ready for I/O. +// The toRun argument is used to build a list of goroutines to return +// from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate +// whether the fd is ready for reading or writing or both. +// +// This may run while the world is stopped, so write barriers are not allowed. //go:nowritebarrier func netpollready(toRun *gList, pd *pollDesc, mode int32) { var rg, wg *g diff --git a/src/runtime/netpoll_aix.go b/src/runtime/netpoll_aix.go index 6feda27b805..e1512f826cb 100644 --- a/src/runtime/netpoll_aix.go +++ b/src/runtime/netpoll_aix.go @@ -63,12 +63,8 @@ func netpollinit() { pds[0] = nil } -func netpolldescriptor() uintptr { - // Both fd must be returned - if rdwake > 0xFFFF || wrwake > 0xFFFF { - throw("netpolldescriptor: invalid fd number") - } - return uintptr(rdwake<<16 | wrwake) +func netpollIsPollDescriptor(fd uintptr) bool { + return fd == uintptr(rdwake) || fd == uintptr(wrwake) } // netpollwakeup writes on wrwake to wakeup poll before any changes. @@ -132,6 +128,11 @@ func netpollarm(pd *pollDesc, mode int) { unlock(&mtxset) } +// netpollBreak interrupts an epollwait. +func netpollBreak() { + netpollwakeup() +} + // netpoll checks for ready network connections. // Returns list of goroutines that become runnable. // delay < 0: blocks indefinitely @@ -176,8 +177,13 @@ retry: } // Check if some descriptors need to be changed if n != 0 && pfds[0].revents&(_POLLIN|_POLLHUP|_POLLERR) != 0 { - var b [1]byte - for read(rdwake, unsafe.Pointer(&b[0]), 1) == 1 { + if delay != 0 { + // A netpollwakeup could be picked up by a + // non-blocking poll. Only clear the wakeup + // if blocking. + var b [1]byte + for read(rdwake, unsafe.Pointer(&b[0]), 1) == 1 { + } } // Do not look at the other fds in this case as the mode may have changed // XXX only additions of flags are made, so maybe it is ok diff --git a/src/runtime/netpoll_epoll.go b/src/runtime/netpoll_epoll.go index 73dfb4561ea..b9dc18c939e 100644 --- a/src/runtime/netpoll_epoll.go +++ b/src/runtime/netpoll_epoll.go @@ -20,24 +20,40 @@ func closeonexec(fd int32) var ( epfd int32 = -1 // epoll descriptor + + netpollBreakRd, netpollBreakWr uintptr // for netpollBreak ) func netpollinit() { epfd = epollcreate1(_EPOLL_CLOEXEC) - if epfd >= 0 { - return - } - epfd = epollcreate(1024) - if epfd >= 0 { + if epfd < 0 { + epfd = epollcreate(1024) + if epfd < 0 { + println("runtime: epollcreate failed with", -epfd) + throw("runtime: netpollinit failed") + } closeonexec(epfd) - return } - println("runtime: epollcreate failed with", -epfd) - throw("runtime: netpollinit failed") + r, w, errno := nonblockingPipe() + if errno != 0 { + println("runtime: pipe failed with", -errno) + throw("runtime: pipe failed") + } + ev := epollevent{ + events: _EPOLLIN, + } + *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd + errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev) + if errno != 0 { + println("runtime: epollctl failed with", -errno) + throw("runtime: epollctl failed") + } + netpollBreakRd = uintptr(r) + netpollBreakWr = uintptr(w) } -func netpolldescriptor() uintptr { - return uintptr(epfd) +func netpollIsPollDescriptor(fd uintptr) bool { + return fd == uintptr(epfd) || fd == netpollBreakRd || fd == netpollBreakWr } func netpollopen(fd uintptr, pd *pollDesc) int32 { @@ -56,6 +72,25 @@ func netpollarm(pd *pollDesc, mode int) { throw("runtime: unused") } +// netpollBreak interrupts an epollwait. +func netpollBreak() { + for { + var b byte + n := write(netpollBreakWr, unsafe.Pointer(&b), 1) + if n == 1 { + break + } + if n == -_EINTR { + continue + } + if n == -_EAGAIN { + return + } + println("runtime: netpollBreak write failed with", -n) + throw("runtime: netpollBreak write failed") + } +} + // netpoll checks for ready network connections. // Returns list of goroutines that become runnable. // delay < 0: blocks indefinitely @@ -100,6 +135,22 @@ retry: if ev.events == 0 { continue } + + if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd { + if ev.events != _EPOLLIN { + println("runtime: netpoll: break fd ready for", ev.events) + throw("runtime: netpoll: break fd ready for something unexpected") + } + if delay != 0 { + // netpollBreak could be picked up by a + // nonblocking poll. Only read the byte + // if blocking. + var tmp [16]byte + read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp))) + } + continue + } + var mode int32 if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'r' diff --git a/src/runtime/netpoll_fake.go b/src/runtime/netpoll_fake.go index 071d87ad50a..b2af3b89b2c 100644 --- a/src/runtime/netpoll_fake.go +++ b/src/runtime/netpoll_fake.go @@ -12,8 +12,8 @@ package runtime func netpollinit() { } -func netpolldescriptor() uintptr { - return ^uintptr(0) +func netpollIsPollDescriptor(fd uintptr) bool { + return false } func netpollopen(fd uintptr, pd *pollDesc) int32 { @@ -27,6 +27,9 @@ func netpollclose(fd uintptr) int32 { func netpollarm(pd *pollDesc, mode int) { } +func netpollBreak() { +} + func netpoll(delay int64) gList { return gList{} } diff --git a/src/runtime/netpoll_kqueue.go b/src/runtime/netpoll_kqueue.go index ce8da73d1e4..54586a393d6 100644 --- a/src/runtime/netpoll_kqueue.go +++ b/src/runtime/netpoll_kqueue.go @@ -12,6 +12,8 @@ import "unsafe" var ( kq int32 = -1 + + netpollBreakRd, netpollBreakWr uintptr // for netpollBreak ) func netpollinit() { @@ -21,10 +23,27 @@ func netpollinit() { throw("runtime: netpollinit failed") } closeonexec(kq) + r, w, errno := nonblockingPipe() + if errno != 0 { + println("runtime: pipe failed with", -errno) + throw("runtime: pipe failed") + } + ev := keventt{ + filter: _EVFILT_READ, + flags: _EV_ADD, + } + *(*uintptr)(unsafe.Pointer(&ev.ident)) = uintptr(r) + n := kevent(kq, &ev, 1, nil, 0, nil) + if n < 0 { + println("runtime: kevent failed with", -errno) + throw("runtime: kevent failed") + } + netpollBreakRd = uintptr(r) + netpollBreakWr = uintptr(w) } -func netpolldescriptor() uintptr { - return uintptr(kq) +func netpollIsPollDescriptor(fd uintptr) bool { + return fd == uintptr(kq) || fd == netpollBreakRd || fd == netpollBreakWr } func netpollopen(fd uintptr, pd *pollDesc) int32 { @@ -57,6 +76,22 @@ func netpollarm(pd *pollDesc, mode int) { throw("runtime: unused") } +// netpollBreak interrupts an epollwait. +func netpollBreak() { + for { + var b byte + n := write(netpollBreakWr, unsafe.Pointer(&b), 1) + if n == 1 || n == -_EAGAIN { + break + } + if n == -_EINTR { + continue + } + println("runtime: netpollBreak write failed with", -n) + throw("runtime: netpollBreak write failed") + } +} + // netpoll checks for ready network connections. // Returns list of goroutines that become runnable. // delay < 0: blocks indefinitely @@ -98,6 +133,22 @@ retry: var toRun gList for i := 0; i < int(n); i++ { ev := &events[i] + + if uintptr(ev.ident) == netpollBreakRd { + if ev.filter != _EVFILT_READ { + println("runtime: netpoll: break fd ready for", ev.filter) + throw("runtime: netpoll: break fd ready for something unexpected") + } + if delay != 0 { + // netpollBreak could be picked up by a + // nonblocking poll. Only read the byte + // if blocking. + var tmp [16]byte + read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp))) + } + continue + } + var mode int32 switch ev.filter { case _EVFILT_READ: diff --git a/src/runtime/netpoll_solaris.go b/src/runtime/netpoll_solaris.go index ad41ab5af21..fac4829ed1b 100644 --- a/src/runtime/netpoll_solaris.go +++ b/src/runtime/netpoll_solaris.go @@ -71,17 +71,20 @@ import "unsafe" //go:cgo_import_dynamic libc_port_associate port_associate "libc.so" //go:cgo_import_dynamic libc_port_dissociate port_dissociate "libc.so" //go:cgo_import_dynamic libc_port_getn port_getn "libc.so" +//go:cgo_import_dynamic libc_port_alert port_alert "libc.so" //go:linkname libc_port_create libc_port_create //go:linkname libc_port_associate libc_port_associate //go:linkname libc_port_dissociate libc_port_dissociate //go:linkname libc_port_getn libc_port_getn +//go:linkname libc_port_alert libc_port_alert var ( libc_port_create, libc_port_associate, libc_port_dissociate, - libc_port_getn libcFunc + libc_port_getn, + libc_port_alert libcFunc ) func errno() int32 { @@ -108,6 +111,10 @@ func port_getn(port int32, evs *portevent, max uint32, nget *uint32, timeout *ti return int32(sysvicall5(&libc_port_getn, uintptr(port), uintptr(unsafe.Pointer(evs)), uintptr(max), uintptr(unsafe.Pointer(nget)), uintptr(unsafe.Pointer(timeout)))) } +func port_alert(port int32, flags, events uint32, user uintptr) int32 { + return int32(sysvicall4(&libc_port_alert, uintptr(port), uintptr(flags), uintptr(events), user)) +} + var portfd int32 = -1 func netpollinit() { @@ -121,8 +128,8 @@ func netpollinit() { throw("runtime: netpollinit failed") } -func netpolldescriptor() uintptr { - return uintptr(portfd) +func netpollIsPollDescriptor(fd uintptr) bool { + return fd == uintptr(portfd) } func netpollopen(fd uintptr, pd *pollDesc) int32 { @@ -178,6 +185,21 @@ func netpollarm(pd *pollDesc, mode int) { unlock(&pd.lock) } +// netpollBreak interrupts a port_getn wait. +func netpollBreak() { + // Use port_alert to put portfd into alert mode. + // This will wake up all threads sleeping in port_getn on portfd, + // and cause their calls to port_getn to return immediately. + // Further, until portfd is taken out of alert mode, + // all calls to port_getn will return immediately. + if port_alert(portfd, _PORT_ALERT_UPDATE, _POLLHUP, uintptr(unsafe.Pointer(&portfd))) < 0 { + if e := errno(); e != _EBUSY { + println("runtime: port_alert failed with", e) + throw("runtime: netpoll: port_alert failed") + } + } +} + // netpoll checks for ready network connections. // Returns list of goroutines that become runnable. // delay < 0: blocks indefinitely @@ -224,6 +246,24 @@ retry: for i := 0; i < int(n); i++ { ev := &events[i] + if ev.portev_source == _PORT_SOURCE_ALERT { + if ev.portev_events != _POLLHUP || unsafe.Pointer(ev.portev_user) != unsafe.Pointer(&portfd) { + throw("runtime: netpoll: bad port_alert wakeup") + } + if delay != 0 { + // Now that a blocking call to netpoll + // has seen the alert, take portfd + // back out of alert mode. + // See the comment in netpollBreak. + if port_alert(portfd, 0, 0, 0) < 0 { + e := errno() + println("runtime: port_alert failed with", e) + throw("runtime: netpoll: port_alert failed") + } + } + continue + } + if ev.portev_events == 0 { continue } diff --git a/src/runtime/netpoll_stub.go b/src/runtime/netpoll_stub.go index 3437a274915..00c06a440b7 100644 --- a/src/runtime/netpoll_stub.go +++ b/src/runtime/netpoll_stub.go @@ -6,13 +6,30 @@ package runtime +import "runtime/internal/atomic" + var netpollWaiters uint32 +var netpollStubLock mutex +var netpollNote note +var netpollBroken uint32 + +func netpollBreak() { + if atomic.Cas(&netpollBroken, 0, 1) { + notewakeup(&netpollNote) + } +} + // Polls for ready network connections. // Returns list of goroutines that become runnable. func netpoll(delay int64) gList { // Implementation for platforms that do not support // integrated network poller. + if delay != 0 { + noteclear(&netpollNote) + atomic.Store(&netpollBroken, 0) + notetsleep(&netpollNote, delay) + } return gList{} } diff --git a/src/runtime/netpoll_windows.go b/src/runtime/netpoll_windows.go index fde413677a9..ced52cbd3a3 100644 --- a/src/runtime/netpoll_windows.go +++ b/src/runtime/netpoll_windows.go @@ -41,8 +41,8 @@ func netpollinit() { } } -func netpolldescriptor() uintptr { - return iocphandle +func netpollIsPollDescriptor(fd uintptr) bool { + return fd == iocphandle } func netpollopen(fd uintptr, pd *pollDesc) int32 { @@ -61,6 +61,13 @@ func netpollarm(pd *pollDesc, mode int) { throw("runtime: unused") } +func netpollBreak() { + if stdcall4(_PostQueuedCompletionStatus, iocphandle, 0, 0, 0) == 0 { + println("runtime: netpoll: PostQueuedCompletionStatus failed (errno=", getlasterror(), ")") + throw("runtime: netpoll: PostQueuedCompletionStatus failed") + } +} + // netpoll checks for ready network connections. // Returns list of goroutines that become runnable. // delay < 0: blocks indefinitely @@ -112,12 +119,20 @@ func netpoll(delay int64) gList { mp.blocked = false for i = 0; i < n; i++ { op = entries[i].op - errno = 0 - qty = 0 - if stdcall5(_WSAGetOverlappedResult, op.pd.fd, uintptr(unsafe.Pointer(op)), uintptr(unsafe.Pointer(&qty)), 0, uintptr(unsafe.Pointer(&flags))) == 0 { - errno = int32(getlasterror()) + if op != nil { + errno = 0 + qty = 0 + if stdcall5(_WSAGetOverlappedResult, op.pd.fd, uintptr(unsafe.Pointer(op)), uintptr(unsafe.Pointer(&qty)), 0, uintptr(unsafe.Pointer(&flags))) == 0 { + errno = int32(getlasterror()) + } + handlecompletion(&toRun, op, errno, qty) + } else { + if delay == 0 { + // Forward the notification to the + // blocked poller. + netpollBreak() + } } - handlecompletion(&toRun, op, errno, qty) } } else { op = nil @@ -139,6 +154,14 @@ func netpoll(delay int64) gList { // dequeued failed IO packet, so report that } mp.blocked = false + if op == nil { + if delay == 0 { + // Forward the notification to the + // blocked poller. + netpollBreak() + } + return gList{} + } handlecompletion(&toRun, op, errno, qty) } return toRun diff --git a/src/runtime/os_netbsd.go b/src/runtime/os_netbsd.go index 17742207661..3cb9411a9cd 100644 --- a/src/runtime/os_netbsd.go +++ b/src/runtime/os_netbsd.go @@ -24,8 +24,6 @@ const ( // From _LWP_DETACHED = 0x00000040 - - _EAGAIN = 35 ) type mOS struct { diff --git a/src/runtime/os_openbsd.go b/src/runtime/os_openbsd.go index be887a549d7..351a99f7e9f 100644 --- a/src/runtime/os_openbsd.go +++ b/src/runtime/os_openbsd.go @@ -65,7 +65,6 @@ func setNonblock(fd int32) const ( _ESRCH = 3 - _EAGAIN = 35 _EWOULDBLOCK = _EAGAIN _ENOTSUP = 91 diff --git a/src/runtime/os_windows.go b/src/runtime/os_windows.go index 34d0627fcbf..764db6edb0c 100644 --- a/src/runtime/os_windows.go +++ b/src/runtime/os_windows.go @@ -34,6 +34,7 @@ const ( //go:cgo_import_dynamic runtime._GetThreadContext GetThreadContext%2 "kernel32.dll" //go:cgo_import_dynamic runtime._LoadLibraryW LoadLibraryW%1 "kernel32.dll" //go:cgo_import_dynamic runtime._LoadLibraryA LoadLibraryA%1 "kernel32.dll" +//go:cgo_import_dynamic runtime._PostQueuedCompletionStatus PostQueuedCompletionStatus%4 "kernel32.dll" //go:cgo_import_dynamic runtime._ResumeThread ResumeThread%1 "kernel32.dll" //go:cgo_import_dynamic runtime._SetConsoleCtrlHandler SetConsoleCtrlHandler%2 "kernel32.dll" //go:cgo_import_dynamic runtime._SetErrorMode SetErrorMode%1 "kernel32.dll" @@ -80,6 +81,7 @@ var ( _GetThreadContext, _LoadLibraryW, _LoadLibraryA, + _PostQueuedCompletionStatus, _QueryPerformanceCounter, _QueryPerformanceFrequency, _ResumeThread, diff --git a/src/runtime/proc_test.go b/src/runtime/proc_test.go index 6e6272e80a2..3a1bf91fa5d 100644 --- a/src/runtime/proc_test.go +++ b/src/runtime/proc_test.go @@ -981,3 +981,42 @@ func TestPreemptionAfterSyscall(t *testing.T) { func TestGetgThreadSwitch(t *testing.T) { runtime.RunGetgThreadSwitchTest() } + +// TestNetpollBreak tests that netpollBreak can break a netpoll. +// This test is not particularly safe since the call to netpoll +// will pick up any stray files that are ready, but it should work +// OK as long it is not run in parallel. +func TestNetpollBreak(t *testing.T) { + if runtime.GOMAXPROCS(0) == 1 { + t.Skip("skipping: GOMAXPROCS=1") + } + + // Make sure that netpoll is initialized. + time.Sleep(1) + + start := time.Now() + c := make(chan bool, 2) + go func() { + c <- true + runtime.Netpoll(10 * time.Second.Nanoseconds()) + c <- true + }() + <-c + // Loop because the break might get eaten by the scheduler. + // Break twice to break both the netpoll we started and the + // scheduler netpoll. +loop: + for { + runtime.Usleep(100) + runtime.NetpollBreak() + runtime.NetpollBreak() + select { + case <-c: + break loop + default: + } + } + if dur := time.Since(start); dur > 5*time.Second { + t.Errorf("netpollBreak did not interrupt netpoll: slept for: %v", dur) + } +}