1
0
mirror of https://github.com/golang/go synced 2024-11-23 05:20:11 -07:00

runtime: add netpollBreak

The new netpollBreak function can be used to interrupt a blocking netpoll.
This function is not currently used; it will be used by later CLs.

Updates #27707

Change-Id: I5cb936609ba13c3c127ea1368a49194fc58c9f4d
Reviewed-on: https://go-review.googlesource.com/c/go/+/171824
Run-TryBot: Ian Lance Taylor <iant@golang.org>
Reviewed-by: Michael Knyszek <mknyszek@google.com>
This commit is contained in:
Ian Lance Taylor 2019-04-05 15:53:12 -07:00
parent 33425ab8db
commit 50f4896b72
34 changed files with 322 additions and 53 deletions

View File

@ -6,6 +6,7 @@ package runtime
const ( const (
_EINTR = 0x4 _EINTR = 0x4
_EFAULT = 0xe _EFAULT = 0xe
_EAGAIN = 0x23
_ENOSYS = 0x4e _ENOSYS = 0x4e
_O_NONBLOCK = 0x4 _O_NONBLOCK = 0x4

View File

@ -6,6 +6,7 @@ package runtime
const ( const (
_EINTR = 0x4 _EINTR = 0x4
_EFAULT = 0xe _EFAULT = 0xe
_EAGAIN = 0x23
_ENOSYS = 0x4e _ENOSYS = 0x4e
_O_NONBLOCK = 0x4 _O_NONBLOCK = 0x4

View File

@ -6,6 +6,7 @@ package runtime
const ( const (
_EINTR = 0x4 _EINTR = 0x4
_EFAULT = 0xe _EFAULT = 0xe
_EAGAIN = 0x23
_ENOSYS = 0x4e _ENOSYS = 0x4e
_O_NONBLOCK = 0x4 _O_NONBLOCK = 0x4

View File

@ -6,6 +6,7 @@ package runtime
const ( const (
_EINTR = 0x4 _EINTR = 0x4
_EFAULT = 0xe _EFAULT = 0xe
_EAGAIN = 0x23
_ENOSYS = 0x4e _ENOSYS = 0x4e
_O_NONBLOCK = 0x4 _O_NONBLOCK = 0x4

View File

@ -8,6 +8,7 @@ const (
_EBADF = 0x9 _EBADF = 0x9
_EFAULT = 0xe _EFAULT = 0xe
_EAGAIN = 0xb _EAGAIN = 0xb
_EBUSY = 0x10
_ETIME = 0x3e _ETIME = 0x3e
_ETIMEDOUT = 0x91 _ETIMEDOUT = 0x91
_EWOULDBLOCK = 0xb _EWOULDBLOCK = 0xb
@ -100,7 +101,9 @@ const (
_POLLHUP = 0x10 _POLLHUP = 0x10
_POLLERR = 0x8 _POLLERR = 0x8
_PORT_SOURCE_FD = 0x4 _PORT_SOURCE_FD = 0x4
_PORT_SOURCE_ALERT = 0x5
_PORT_ALERT_UPDATE = 0x2
) )
type semt struct { type semt struct {

View File

@ -30,6 +30,7 @@ import "C"
const ( const (
EINTR = C.EINTR EINTR = C.EINTR
EFAULT = C.EFAULT EFAULT = C.EFAULT
EAGAIN = C.EAGAIN
ETIMEDOUT = C.ETIMEDOUT ETIMEDOUT = C.ETIMEDOUT
PROT_NONE = C.PROT_NONE PROT_NONE = C.PROT_NONE

View File

@ -8,6 +8,7 @@ import "unsafe"
const ( const (
_EINTR = 0x4 _EINTR = 0x4
_EFAULT = 0xe _EFAULT = 0xe
_EAGAIN = 0x23
_ETIMEDOUT = 0x3c _ETIMEDOUT = 0x3c
_PROT_NONE = 0x0 _PROT_NONE = 0x0

View File

@ -8,6 +8,7 @@ import "unsafe"
const ( const (
_EINTR = 0x4 _EINTR = 0x4
_EFAULT = 0xe _EFAULT = 0xe
_EAGAIN = 0x23
_ETIMEDOUT = 0x3c _ETIMEDOUT = 0x3c
_PROT_NONE = 0x0 _PROT_NONE = 0x0

View File

@ -10,6 +10,7 @@ import "unsafe"
const ( const (
_EINTR = 0x4 _EINTR = 0x4
_EFAULT = 0xe _EFAULT = 0xe
_EAGAIN = 0x23
_ETIMEDOUT = 0x3c _ETIMEDOUT = 0x3c
_PROT_NONE = 0x0 _PROT_NONE = 0x0

View File

@ -8,6 +8,7 @@ import "unsafe"
const ( const (
_EINTR = 0x4 _EINTR = 0x4
_EFAULT = 0xe _EFAULT = 0xe
_EAGAIN = 0x23
_ETIMEDOUT = 0x3c _ETIMEDOUT = 0x3c
_PROT_NONE = 0x0 _PROT_NONE = 0x0

View File

@ -47,6 +47,7 @@ const (
const ( const (
EINTR = C.EINTR EINTR = C.EINTR
EFAULT = C.EFAULT EFAULT = C.EFAULT
EAGAIN = C.EAGAIN
ENOSYS = C.ENOSYS ENOSYS = C.ENOSYS
O_NONBLOCK = C.O_NONBLOCK O_NONBLOCK = C.O_NONBLOCK

View File

@ -15,6 +15,7 @@ const (
const ( const (
_EINTR = 0x4 _EINTR = 0x4
_EFAULT = 0xe _EFAULT = 0xe
_EAGAIN = 0x23
_ENOSYS = 0x4e _ENOSYS = 0x4e
_O_NONBLOCK = 0x4 _O_NONBLOCK = 0x4

View File

@ -15,6 +15,7 @@ const (
const ( const (
_EINTR = 0x4 _EINTR = 0x4
_EFAULT = 0xe _EFAULT = 0xe
_EAGAIN = 0x23
_ENOSYS = 0x4e _ENOSYS = 0x4e
_O_NONBLOCK = 0x4 _O_NONBLOCK = 0x4

View File

@ -15,6 +15,7 @@ const (
const ( const (
_EINTR = 0x4 _EINTR = 0x4
_EFAULT = 0xe _EFAULT = 0xe
_EAGAIN = 0x23
_ENOSYS = 0x4e _ENOSYS = 0x4e
_O_NONBLOCK = 0x4 _O_NONBLOCK = 0x4

View File

@ -32,6 +32,7 @@ import "C"
const ( const (
EINTR = C.EINTR EINTR = C.EINTR
EFAULT = C.EFAULT EFAULT = C.EFAULT
EAGAIN = C.EAGAIN
ENOSYS = C.ENOSYS ENOSYS = C.ENOSYS
O_NONBLOCK = C.O_NONBLOCK O_NONBLOCK = C.O_NONBLOCK

View File

@ -28,6 +28,7 @@ import "C"
const ( const (
EINTR = C.EINTR EINTR = C.EINTR
EFAULT = C.EFAULT EFAULT = C.EFAULT
EAGAIN = C.EAGAIN
ENOSYS = C.ENOSYS ENOSYS = C.ENOSYS
O_NONBLOCK = C.O_NONBLOCK O_NONBLOCK = C.O_NONBLOCK

View File

@ -8,6 +8,7 @@ import "unsafe"
const ( const (
_EINTR = 0x4 _EINTR = 0x4
_EFAULT = 0xe _EFAULT = 0xe
_EAGAIN = 0x23
_ENOSYS = 0x4e _ENOSYS = 0x4e
_O_NONBLOCK = 0x4 _O_NONBLOCK = 0x4

View File

@ -8,6 +8,7 @@ import "unsafe"
const ( const (
_EINTR = 0x4 _EINTR = 0x4
_EFAULT = 0xe _EFAULT = 0xe
_EAGAIN = 0x23
_ENOSYS = 0x4e _ENOSYS = 0x4e
_O_NONBLOCK = 0x4 _O_NONBLOCK = 0x4

View File

@ -8,6 +8,7 @@ import "unsafe"
const ( const (
_EINTR = 0x4 _EINTR = 0x4
_EFAULT = 0xe _EFAULT = 0xe
_EAGAIN = 0x23
_ENOSYS = 0x4e _ENOSYS = 0x4e
_O_NONBLOCK = 0x4 _O_NONBLOCK = 0x4

View File

@ -5,6 +5,7 @@ import "unsafe"
const ( const (
_EINTR = 0x4 _EINTR = 0x4
_EFAULT = 0xe _EFAULT = 0xe
_EAGAIN = 0x23
_ENOSYS = 0x4e _ENOSYS = 0x4e
_O_NONBLOCK = 0x4 _O_NONBLOCK = 0x4

View File

@ -38,6 +38,7 @@ const (
EBADF = C.EBADF EBADF = C.EBADF
EFAULT = C.EFAULT EFAULT = C.EFAULT
EAGAIN = C.EAGAIN EAGAIN = C.EAGAIN
EBUSY = C.EBUSY
ETIME = C.ETIME ETIME = C.ETIME
ETIMEDOUT = C.ETIMEDOUT ETIMEDOUT = C.ETIMEDOUT
EWOULDBLOCK = C.EWOULDBLOCK EWOULDBLOCK = C.EWOULDBLOCK
@ -129,7 +130,9 @@ const (
POLLHUP = C.POLLHUP POLLHUP = C.POLLHUP
POLLERR = C.POLLERR POLLERR = C.POLLERR
PORT_SOURCE_FD = C.PORT_SOURCE_FD PORT_SOURCE_FD = C.PORT_SOURCE_FD
PORT_SOURCE_ALERT = C.PORT_SOURCE_ALERT
PORT_ALERT_UPDATE = C.PORT_ALERT_UPDATE
) )
type SemT C.sem_t type SemT C.sem_t

View File

@ -35,6 +35,9 @@ var Atoi = atoi
var Atoi32 = atoi32 var Atoi32 = atoi32
var Nanotime = nanotime var Nanotime = nanotime
var Netpoll = netpoll
var NetpollBreak = netpollBreak
var Usleep = usleep
var PhysHugePageSize = physHugePageSize var PhysHugePageSize = physHugePageSize

View File

@ -12,12 +12,26 @@ import (
) )
// Integrated network poller (platform-independent part). // Integrated network poller (platform-independent part).
// A particular implementation (epoll/kqueue) must define the following functions: // A particular implementation (epoll/kqueue/port/AIX/Windows)
// func netpollinit() // to initialize the poller // must define the following functions:
// func netpollopen(fd uintptr, pd *pollDesc) int32 // to arm edge-triggered notifications //
// and associate fd with pd. // func netpollinit()
// An implementation must call the following function to denote that the pd is ready. // Initialize the poller. Only called once.
// func netpollready(gpp **g, pd *pollDesc, mode int32) //
// func netpollopen(fd uintptr, pd *pollDesc) int32
// Arm edge-triggered notifications for fd. The pd argument is to pass
// back to netpollready when fd is ready. Return an errno value.
//
// func netpoll(delta int64) gList
// Poll the network. If delta < 0, block indefinitely. If delta == 0,
// poll without blocking. If delta > 0, block for up to delta nanoseconds.
// Return a list of goroutines built by calling netpollready.
//
// func netpollBreak()
// Wake up the network poller, assumed to be blocked in netpoll.
//
// func netpollIsPollDescriptor(fd uintptr) bool
// Reports whether fd is a file descriptor used by the poller.
// pollDesc contains 2 binary semaphores, rg and wg, to park reader and writer // pollDesc contains 2 binary semaphores, rg and wg, to park reader and writer
// goroutines respectively. The semaphore can be in the following states: // goroutines respectively. The semaphore can be in the following states:
@ -99,14 +113,7 @@ func netpollinited() bool {
// poll_runtime_isPollServerDescriptor reports whether fd is a // poll_runtime_isPollServerDescriptor reports whether fd is a
// descriptor being used by netpoll. // descriptor being used by netpoll.
func poll_runtime_isPollServerDescriptor(fd uintptr) bool { func poll_runtime_isPollServerDescriptor(fd uintptr) bool {
fds := netpolldescriptor() return netpollIsPollDescriptor(fd)
if GOOS != "aix" {
return fd == fds
} else {
// AIX have a pipe in its netpoll implementation.
// Therefore, two fd are returned by netpolldescriptor using a mask.
return fd == fds&0xFFFF || fd == (fds>>16)&0xFFFF
}
} }
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen //go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
@ -316,8 +323,13 @@ func poll_runtime_pollUnblock(pd *pollDesc) {
} }
} }
// make pd ready, newly runnable goroutines (if any) are added to toRun. // netpollready is called by the platform-specific netpoll function.
// May run during STW, so write barriers are not allowed. // It declares that the fd associated with pd is ready for I/O.
// The toRun argument is used to build a list of goroutines to return
// from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate
// whether the fd is ready for reading or writing or both.
//
// This may run while the world is stopped, so write barriers are not allowed.
//go:nowritebarrier //go:nowritebarrier
func netpollready(toRun *gList, pd *pollDesc, mode int32) { func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g var rg, wg *g

View File

@ -63,12 +63,8 @@ func netpollinit() {
pds[0] = nil pds[0] = nil
} }
func netpolldescriptor() uintptr { func netpollIsPollDescriptor(fd uintptr) bool {
// Both fd must be returned return fd == uintptr(rdwake) || fd == uintptr(wrwake)
if rdwake > 0xFFFF || wrwake > 0xFFFF {
throw("netpolldescriptor: invalid fd number")
}
return uintptr(rdwake<<16 | wrwake)
} }
// netpollwakeup writes on wrwake to wakeup poll before any changes. // netpollwakeup writes on wrwake to wakeup poll before any changes.
@ -132,6 +128,11 @@ func netpollarm(pd *pollDesc, mode int) {
unlock(&mtxset) unlock(&mtxset)
} }
// netpollBreak interrupts an epollwait.
func netpollBreak() {
netpollwakeup()
}
// netpoll checks for ready network connections. // netpoll checks for ready network connections.
// Returns list of goroutines that become runnable. // Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely // delay < 0: blocks indefinitely
@ -176,8 +177,13 @@ retry:
} }
// Check if some descriptors need to be changed // Check if some descriptors need to be changed
if n != 0 && pfds[0].revents&(_POLLIN|_POLLHUP|_POLLERR) != 0 { if n != 0 && pfds[0].revents&(_POLLIN|_POLLHUP|_POLLERR) != 0 {
var b [1]byte if delay != 0 {
for read(rdwake, unsafe.Pointer(&b[0]), 1) == 1 { // A netpollwakeup could be picked up by a
// non-blocking poll. Only clear the wakeup
// if blocking.
var b [1]byte
for read(rdwake, unsafe.Pointer(&b[0]), 1) == 1 {
}
} }
// Do not look at the other fds in this case as the mode may have changed // Do not look at the other fds in this case as the mode may have changed
// XXX only additions of flags are made, so maybe it is ok // XXX only additions of flags are made, so maybe it is ok

View File

@ -20,24 +20,40 @@ func closeonexec(fd int32)
var ( var (
epfd int32 = -1 // epoll descriptor epfd int32 = -1 // epoll descriptor
netpollBreakRd, netpollBreakWr uintptr // for netpollBreak
) )
func netpollinit() { func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC) epfd = epollcreate1(_EPOLL_CLOEXEC)
if epfd >= 0 { if epfd < 0 {
return epfd = epollcreate(1024)
} if epfd < 0 {
epfd = epollcreate(1024) println("runtime: epollcreate failed with", -epfd)
if epfd >= 0 { throw("runtime: netpollinit failed")
}
closeonexec(epfd) closeonexec(epfd)
return
} }
println("runtime: epollcreate failed with", -epfd) r, w, errno := nonblockingPipe()
throw("runtime: netpollinit failed") if errno != 0 {
println("runtime: pipe failed with", -errno)
throw("runtime: pipe failed")
}
ev := epollevent{
events: _EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
if errno != 0 {
println("runtime: epollctl failed with", -errno)
throw("runtime: epollctl failed")
}
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
} }
func netpolldescriptor() uintptr { func netpollIsPollDescriptor(fd uintptr) bool {
return uintptr(epfd) return fd == uintptr(epfd) || fd == netpollBreakRd || fd == netpollBreakWr
} }
func netpollopen(fd uintptr, pd *pollDesc) int32 { func netpollopen(fd uintptr, pd *pollDesc) int32 {
@ -56,6 +72,25 @@ func netpollarm(pd *pollDesc, mode int) {
throw("runtime: unused") throw("runtime: unused")
} }
// netpollBreak interrupts an epollwait.
func netpollBreak() {
for {
var b byte
n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
if n == 1 {
break
}
if n == -_EINTR {
continue
}
if n == -_EAGAIN {
return
}
println("runtime: netpollBreak write failed with", -n)
throw("runtime: netpollBreak write failed")
}
}
// netpoll checks for ready network connections. // netpoll checks for ready network connections.
// Returns list of goroutines that become runnable. // Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely // delay < 0: blocks indefinitely
@ -100,6 +135,22 @@ retry:
if ev.events == 0 { if ev.events == 0 {
continue continue
} }
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
if ev.events != _EPOLLIN {
println("runtime: netpoll: break fd ready for", ev.events)
throw("runtime: netpoll: break fd ready for something unexpected")
}
if delay != 0 {
// netpollBreak could be picked up by a
// nonblocking poll. Only read the byte
// if blocking.
var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
}
continue
}
var mode int32 var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 { if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r' mode += 'r'

View File

@ -12,8 +12,8 @@ package runtime
func netpollinit() { func netpollinit() {
} }
func netpolldescriptor() uintptr { func netpollIsPollDescriptor(fd uintptr) bool {
return ^uintptr(0) return false
} }
func netpollopen(fd uintptr, pd *pollDesc) int32 { func netpollopen(fd uintptr, pd *pollDesc) int32 {
@ -27,6 +27,9 @@ func netpollclose(fd uintptr) int32 {
func netpollarm(pd *pollDesc, mode int) { func netpollarm(pd *pollDesc, mode int) {
} }
func netpollBreak() {
}
func netpoll(delay int64) gList { func netpoll(delay int64) gList {
return gList{} return gList{}
} }

View File

@ -12,6 +12,8 @@ import "unsafe"
var ( var (
kq int32 = -1 kq int32 = -1
netpollBreakRd, netpollBreakWr uintptr // for netpollBreak
) )
func netpollinit() { func netpollinit() {
@ -21,10 +23,27 @@ func netpollinit() {
throw("runtime: netpollinit failed") throw("runtime: netpollinit failed")
} }
closeonexec(kq) closeonexec(kq)
r, w, errno := nonblockingPipe()
if errno != 0 {
println("runtime: pipe failed with", -errno)
throw("runtime: pipe failed")
}
ev := keventt{
filter: _EVFILT_READ,
flags: _EV_ADD,
}
*(*uintptr)(unsafe.Pointer(&ev.ident)) = uintptr(r)
n := kevent(kq, &ev, 1, nil, 0, nil)
if n < 0 {
println("runtime: kevent failed with", -errno)
throw("runtime: kevent failed")
}
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
} }
func netpolldescriptor() uintptr { func netpollIsPollDescriptor(fd uintptr) bool {
return uintptr(kq) return fd == uintptr(kq) || fd == netpollBreakRd || fd == netpollBreakWr
} }
func netpollopen(fd uintptr, pd *pollDesc) int32 { func netpollopen(fd uintptr, pd *pollDesc) int32 {
@ -57,6 +76,22 @@ func netpollarm(pd *pollDesc, mode int) {
throw("runtime: unused") throw("runtime: unused")
} }
// netpollBreak interrupts an epollwait.
func netpollBreak() {
for {
var b byte
n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
if n == 1 || n == -_EAGAIN {
break
}
if n == -_EINTR {
continue
}
println("runtime: netpollBreak write failed with", -n)
throw("runtime: netpollBreak write failed")
}
}
// netpoll checks for ready network connections. // netpoll checks for ready network connections.
// Returns list of goroutines that become runnable. // Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely // delay < 0: blocks indefinitely
@ -98,6 +133,22 @@ retry:
var toRun gList var toRun gList
for i := 0; i < int(n); i++ { for i := 0; i < int(n); i++ {
ev := &events[i] ev := &events[i]
if uintptr(ev.ident) == netpollBreakRd {
if ev.filter != _EVFILT_READ {
println("runtime: netpoll: break fd ready for", ev.filter)
throw("runtime: netpoll: break fd ready for something unexpected")
}
if delay != 0 {
// netpollBreak could be picked up by a
// nonblocking poll. Only read the byte
// if blocking.
var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
}
continue
}
var mode int32 var mode int32
switch ev.filter { switch ev.filter {
case _EVFILT_READ: case _EVFILT_READ:

View File

@ -71,17 +71,20 @@ import "unsafe"
//go:cgo_import_dynamic libc_port_associate port_associate "libc.so" //go:cgo_import_dynamic libc_port_associate port_associate "libc.so"
//go:cgo_import_dynamic libc_port_dissociate port_dissociate "libc.so" //go:cgo_import_dynamic libc_port_dissociate port_dissociate "libc.so"
//go:cgo_import_dynamic libc_port_getn port_getn "libc.so" //go:cgo_import_dynamic libc_port_getn port_getn "libc.so"
//go:cgo_import_dynamic libc_port_alert port_alert "libc.so"
//go:linkname libc_port_create libc_port_create //go:linkname libc_port_create libc_port_create
//go:linkname libc_port_associate libc_port_associate //go:linkname libc_port_associate libc_port_associate
//go:linkname libc_port_dissociate libc_port_dissociate //go:linkname libc_port_dissociate libc_port_dissociate
//go:linkname libc_port_getn libc_port_getn //go:linkname libc_port_getn libc_port_getn
//go:linkname libc_port_alert libc_port_alert
var ( var (
libc_port_create, libc_port_create,
libc_port_associate, libc_port_associate,
libc_port_dissociate, libc_port_dissociate,
libc_port_getn libcFunc libc_port_getn,
libc_port_alert libcFunc
) )
func errno() int32 { func errno() int32 {
@ -108,6 +111,10 @@ func port_getn(port int32, evs *portevent, max uint32, nget *uint32, timeout *ti
return int32(sysvicall5(&libc_port_getn, uintptr(port), uintptr(unsafe.Pointer(evs)), uintptr(max), uintptr(unsafe.Pointer(nget)), uintptr(unsafe.Pointer(timeout)))) return int32(sysvicall5(&libc_port_getn, uintptr(port), uintptr(unsafe.Pointer(evs)), uintptr(max), uintptr(unsafe.Pointer(nget)), uintptr(unsafe.Pointer(timeout))))
} }
func port_alert(port int32, flags, events uint32, user uintptr) int32 {
return int32(sysvicall4(&libc_port_alert, uintptr(port), uintptr(flags), uintptr(events), user))
}
var portfd int32 = -1 var portfd int32 = -1
func netpollinit() { func netpollinit() {
@ -121,8 +128,8 @@ func netpollinit() {
throw("runtime: netpollinit failed") throw("runtime: netpollinit failed")
} }
func netpolldescriptor() uintptr { func netpollIsPollDescriptor(fd uintptr) bool {
return uintptr(portfd) return fd == uintptr(portfd)
} }
func netpollopen(fd uintptr, pd *pollDesc) int32 { func netpollopen(fd uintptr, pd *pollDesc) int32 {
@ -178,6 +185,21 @@ func netpollarm(pd *pollDesc, mode int) {
unlock(&pd.lock) unlock(&pd.lock)
} }
// netpollBreak interrupts a port_getn wait.
func netpollBreak() {
// Use port_alert to put portfd into alert mode.
// This will wake up all threads sleeping in port_getn on portfd,
// and cause their calls to port_getn to return immediately.
// Further, until portfd is taken out of alert mode,
// all calls to port_getn will return immediately.
if port_alert(portfd, _PORT_ALERT_UPDATE, _POLLHUP, uintptr(unsafe.Pointer(&portfd))) < 0 {
if e := errno(); e != _EBUSY {
println("runtime: port_alert failed with", e)
throw("runtime: netpoll: port_alert failed")
}
}
}
// netpoll checks for ready network connections. // netpoll checks for ready network connections.
// Returns list of goroutines that become runnable. // Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely // delay < 0: blocks indefinitely
@ -224,6 +246,24 @@ retry:
for i := 0; i < int(n); i++ { for i := 0; i < int(n); i++ {
ev := &events[i] ev := &events[i]
if ev.portev_source == _PORT_SOURCE_ALERT {
if ev.portev_events != _POLLHUP || unsafe.Pointer(ev.portev_user) != unsafe.Pointer(&portfd) {
throw("runtime: netpoll: bad port_alert wakeup")
}
if delay != 0 {
// Now that a blocking call to netpoll
// has seen the alert, take portfd
// back out of alert mode.
// See the comment in netpollBreak.
if port_alert(portfd, 0, 0, 0) < 0 {
e := errno()
println("runtime: port_alert failed with", e)
throw("runtime: netpoll: port_alert failed")
}
}
continue
}
if ev.portev_events == 0 { if ev.portev_events == 0 {
continue continue
} }

View File

@ -6,13 +6,30 @@
package runtime package runtime
import "runtime/internal/atomic"
var netpollWaiters uint32 var netpollWaiters uint32
var netpollStubLock mutex
var netpollNote note
var netpollBroken uint32
func netpollBreak() {
if atomic.Cas(&netpollBroken, 0, 1) {
notewakeup(&netpollNote)
}
}
// Polls for ready network connections. // Polls for ready network connections.
// Returns list of goroutines that become runnable. // Returns list of goroutines that become runnable.
func netpoll(delay int64) gList { func netpoll(delay int64) gList {
// Implementation for platforms that do not support // Implementation for platforms that do not support
// integrated network poller. // integrated network poller.
if delay != 0 {
noteclear(&netpollNote)
atomic.Store(&netpollBroken, 0)
notetsleep(&netpollNote, delay)
}
return gList{} return gList{}
} }

View File

@ -41,8 +41,8 @@ func netpollinit() {
} }
} }
func netpolldescriptor() uintptr { func netpollIsPollDescriptor(fd uintptr) bool {
return iocphandle return fd == iocphandle
} }
func netpollopen(fd uintptr, pd *pollDesc) int32 { func netpollopen(fd uintptr, pd *pollDesc) int32 {
@ -61,6 +61,13 @@ func netpollarm(pd *pollDesc, mode int) {
throw("runtime: unused") throw("runtime: unused")
} }
func netpollBreak() {
if stdcall4(_PostQueuedCompletionStatus, iocphandle, 0, 0, 0) == 0 {
println("runtime: netpoll: PostQueuedCompletionStatus failed (errno=", getlasterror(), ")")
throw("runtime: netpoll: PostQueuedCompletionStatus failed")
}
}
// netpoll checks for ready network connections. // netpoll checks for ready network connections.
// Returns list of goroutines that become runnable. // Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely // delay < 0: blocks indefinitely
@ -112,12 +119,20 @@ func netpoll(delay int64) gList {
mp.blocked = false mp.blocked = false
for i = 0; i < n; i++ { for i = 0; i < n; i++ {
op = entries[i].op op = entries[i].op
errno = 0 if op != nil {
qty = 0 errno = 0
if stdcall5(_WSAGetOverlappedResult, op.pd.fd, uintptr(unsafe.Pointer(op)), uintptr(unsafe.Pointer(&qty)), 0, uintptr(unsafe.Pointer(&flags))) == 0 { qty = 0
errno = int32(getlasterror()) if stdcall5(_WSAGetOverlappedResult, op.pd.fd, uintptr(unsafe.Pointer(op)), uintptr(unsafe.Pointer(&qty)), 0, uintptr(unsafe.Pointer(&flags))) == 0 {
errno = int32(getlasterror())
}
handlecompletion(&toRun, op, errno, qty)
} else {
if delay == 0 {
// Forward the notification to the
// blocked poller.
netpollBreak()
}
} }
handlecompletion(&toRun, op, errno, qty)
} }
} else { } else {
op = nil op = nil
@ -139,6 +154,14 @@ func netpoll(delay int64) gList {
// dequeued failed IO packet, so report that // dequeued failed IO packet, so report that
} }
mp.blocked = false mp.blocked = false
if op == nil {
if delay == 0 {
// Forward the notification to the
// blocked poller.
netpollBreak()
}
return gList{}
}
handlecompletion(&toRun, op, errno, qty) handlecompletion(&toRun, op, errno, qty)
} }
return toRun return toRun

View File

@ -24,8 +24,6 @@ const (
// From <sys/lwp.h> // From <sys/lwp.h>
_LWP_DETACHED = 0x00000040 _LWP_DETACHED = 0x00000040
_EAGAIN = 35
) )
type mOS struct { type mOS struct {

View File

@ -65,7 +65,6 @@ func setNonblock(fd int32)
const ( const (
_ESRCH = 3 _ESRCH = 3
_EAGAIN = 35
_EWOULDBLOCK = _EAGAIN _EWOULDBLOCK = _EAGAIN
_ENOTSUP = 91 _ENOTSUP = 91

View File

@ -34,6 +34,7 @@ const (
//go:cgo_import_dynamic runtime._GetThreadContext GetThreadContext%2 "kernel32.dll" //go:cgo_import_dynamic runtime._GetThreadContext GetThreadContext%2 "kernel32.dll"
//go:cgo_import_dynamic runtime._LoadLibraryW LoadLibraryW%1 "kernel32.dll" //go:cgo_import_dynamic runtime._LoadLibraryW LoadLibraryW%1 "kernel32.dll"
//go:cgo_import_dynamic runtime._LoadLibraryA LoadLibraryA%1 "kernel32.dll" //go:cgo_import_dynamic runtime._LoadLibraryA LoadLibraryA%1 "kernel32.dll"
//go:cgo_import_dynamic runtime._PostQueuedCompletionStatus PostQueuedCompletionStatus%4 "kernel32.dll"
//go:cgo_import_dynamic runtime._ResumeThread ResumeThread%1 "kernel32.dll" //go:cgo_import_dynamic runtime._ResumeThread ResumeThread%1 "kernel32.dll"
//go:cgo_import_dynamic runtime._SetConsoleCtrlHandler SetConsoleCtrlHandler%2 "kernel32.dll" //go:cgo_import_dynamic runtime._SetConsoleCtrlHandler SetConsoleCtrlHandler%2 "kernel32.dll"
//go:cgo_import_dynamic runtime._SetErrorMode SetErrorMode%1 "kernel32.dll" //go:cgo_import_dynamic runtime._SetErrorMode SetErrorMode%1 "kernel32.dll"
@ -80,6 +81,7 @@ var (
_GetThreadContext, _GetThreadContext,
_LoadLibraryW, _LoadLibraryW,
_LoadLibraryA, _LoadLibraryA,
_PostQueuedCompletionStatus,
_QueryPerformanceCounter, _QueryPerformanceCounter,
_QueryPerformanceFrequency, _QueryPerformanceFrequency,
_ResumeThread, _ResumeThread,

View File

@ -981,3 +981,42 @@ func TestPreemptionAfterSyscall(t *testing.T) {
func TestGetgThreadSwitch(t *testing.T) { func TestGetgThreadSwitch(t *testing.T) {
runtime.RunGetgThreadSwitchTest() runtime.RunGetgThreadSwitchTest()
} }
// TestNetpollBreak tests that netpollBreak can break a netpoll.
// This test is not particularly safe since the call to netpoll
// will pick up any stray files that are ready, but it should work
// OK as long it is not run in parallel.
func TestNetpollBreak(t *testing.T) {
if runtime.GOMAXPROCS(0) == 1 {
t.Skip("skipping: GOMAXPROCS=1")
}
// Make sure that netpoll is initialized.
time.Sleep(1)
start := time.Now()
c := make(chan bool, 2)
go func() {
c <- true
runtime.Netpoll(10 * time.Second.Nanoseconds())
c <- true
}()
<-c
// Loop because the break might get eaten by the scheduler.
// Break twice to break both the netpoll we started and the
// scheduler netpoll.
loop:
for {
runtime.Usleep(100)
runtime.NetpollBreak()
runtime.NetpollBreak()
select {
case <-c:
break loop
default:
}
}
if dur := time.Since(start); dur > 5*time.Second {
t.Errorf("netpollBreak did not interrupt netpoll: slept for: %v", dur)
}
}