1
0
mirror of https://github.com/golang/go synced 2024-11-17 06:54:48 -07:00

internal/poll: assume we have CancelIoEX on Windows

As of the Go 1.11 release we require at least Windows 7, so CancelIoEx
is always available.  This lets us simplify the code to not require
dedicated threads to handle I/O requests.

Fixes #37956

Change-Id: If1dc4ac4acb61c43e4f2a9f26f225869050262a5
Reviewed-on: https://go-review.googlesource.com/c/go/+/225060
Run-TryBot: Ian Lance Taylor <iant@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Alex Brainman <alex.brainman@gmail.com>
This commit is contained in:
Ian Lance Taylor 2020-03-23 21:11:01 -07:00
parent 825ae71e56
commit 191118a821
2 changed files with 23 additions and 100 deletions

View File

@ -9,7 +9,6 @@ import (
"internal/race"
"internal/syscall/windows"
"io"
"runtime"
"sync"
"syscall"
"unicode/utf16"
@ -22,18 +21,6 @@ var (
ioSync uint64
)
// CancelIo Windows API cancels all outstanding IO for a particular
// socket on current thread. To overcome that limitation, we run
// special goroutine, locked to OS single thread, that both starts
// and cancels IO. It means, there are 2 unavoidable thread switches
// for every IO.
// Some newer versions of Windows has new CancelIoEx API, that does
// not have that limitation and can be used from any thread. This
// package uses CancelIoEx API, if present, otherwise it fallback
// to CancelIo.
var canCancelIO bool // determines if CancelIoEx API is present
// This package uses the SetFileCompletionNotificationModes Windows
// API to skip calling GetQueuedCompletionStatus if an IO operation
// completes synchronously. There is a known bug where
@ -72,7 +59,6 @@ func init() {
if e != nil {
initErr = e
}
canCancelIO = syscall.LoadCancelIoEx() == nil
checkSetFileCompletionNotificationModes()
}
@ -90,7 +76,6 @@ type operation struct {
// fields used only by net package
fd *FD
errc chan error
buf syscall.WSABuf
msg windows.WSAMsg
sa syscall.Sockaddr
@ -155,46 +140,15 @@ func (o *operation) InitMsg(p []byte, oob []byte) {
}
}
// ioSrv executes net IO requests.
type ioSrv struct {
req chan ioSrvReq
}
type ioSrvReq struct {
o *operation
submit func(o *operation) error // if nil, cancel the operation
}
// ProcessRemoteIO will execute submit IO requests on behalf
// of other goroutines, all on a single os thread, so it can
// cancel them later. Results of all operations will be sent
// back to their requesters via channel supplied in request.
// It is used only when the CancelIoEx API is unavailable.
func (s *ioSrv) ProcessRemoteIO() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
for r := range s.req {
if r.submit != nil {
r.o.errc <- r.submit(r.o)
} else {
r.o.errc <- syscall.CancelIo(r.o.fd.Sysfd)
}
}
}
// ExecIO executes a single IO operation o. It submits and cancels
// execIO executes a single IO operation o. It submits and cancels
// IO in the current thread for systems where Windows CancelIoEx API
// is available. Alternatively, it passes the request onto
// runtime netpoll and waits for completion or cancels request.
func (s *ioSrv) ExecIO(o *operation, submit func(o *operation) error) (int, error) {
func execIO(o *operation, submit func(o *operation) error) (int, error) {
if o.fd.pd.runtimeCtx == 0 {
return 0, errors.New("internal error: polling on unsupported descriptor type")
}
if !canCancelIO {
onceStartServer.Do(startServer)
}
fd := o.fd
// Notify runtime netpoll about starting IO.
err := fd.pd.prepare(int(o.mode), fd.isFile)
@ -202,14 +156,7 @@ func (s *ioSrv) ExecIO(o *operation, submit func(o *operation) error) (int, erro
return 0, err
}
// Start IO.
if canCancelIO {
err = submit(o)
} else {
// Send request to a special dedicated thread,
// so it can stop the IO with CancelIO later.
s.req <- ioSrvReq{o, submit}
err = <-o.errc
}
err = submit(o)
switch err {
case nil:
// IO completed immediately
@ -247,16 +194,11 @@ func (s *ioSrv) ExecIO(o *operation, submit func(o *operation) error) (int, erro
panic("unexpected runtime.netpoll error: " + netpollErr.Error())
}
// Cancel our request.
if canCancelIO {
err := syscall.CancelIoEx(fd.Sysfd, &o.o)
// Assuming ERROR_NOT_FOUND is returned, if IO is completed.
if err != nil && err != syscall.ERROR_NOT_FOUND {
// TODO(brainman): maybe do something else, but panic.
panic(err)
}
} else {
s.req <- ioSrvReq{o, nil}
<-o.errc
err = syscall.CancelIoEx(fd.Sysfd, &o.o)
// Assuming ERROR_NOT_FOUND is returned, if IO is completed.
if err != nil && err != syscall.ERROR_NOT_FOUND {
// TODO(brainman): maybe do something else, but panic.
panic(err)
}
// Wait for cancellation to complete.
fd.pd.waitCanceled(int(o.mode))
@ -273,21 +215,6 @@ func (s *ioSrv) ExecIO(o *operation, submit func(o *operation) error) (int, erro
return int(o.qty), nil
}
// Start helper goroutines.
var rsrv, wsrv ioSrv
var onceStartServer sync.Once
func startServer() {
// This is called, once, when only the CancelIo API is available.
// Start two special goroutines, both locked to an OS thread,
// that start and cancel IO requests.
// One will process read requests, while the other will do writes.
rsrv.req = make(chan ioSrvReq)
go rsrv.ProcessRemoteIO()
wsrv.req = make(chan ioSrvReq)
go wsrv.ProcessRemoteIO()
}
// FD is a file descriptor. The net and os packages embed this type in
// a larger type representing a network connection or OS file.
type FD struct {
@ -385,9 +312,9 @@ func (fd *FD) Init(net string, pollable bool) (string, error) {
// if the user is doing their own overlapped I/O.
// See issue #21172.
//
// In general the code below avoids calling the ExecIO
// method for non-network sockets. If some method does
// somehow call ExecIO, then ExecIO, and therefore the
// In general the code below avoids calling the execIO
// function for non-network sockets. If some method does
// somehow call execIO, then execIO, and therefore the
// calling method, will return an error, because
// fd.pd.runtimeCtx will be 0.
err = fd.pd.init(fd)
@ -429,10 +356,6 @@ func (fd *FD) Init(net string, pollable bool) (string, error) {
fd.wop.fd = fd
fd.rop.runtimeCtx = fd.pd.runtimeCtx
fd.wop.runtimeCtx = fd.pd.runtimeCtx
if !canCancelIO {
fd.rop.errc = make(chan error)
fd.wop.errc = make(chan error)
}
return "", nil
}
@ -515,7 +438,7 @@ func (fd *FD) Read(buf []byte) (int, error) {
} else {
o := &fd.rop
o.InitBuf(buf)
n, err = rsrv.ExecIO(o, func(o *operation) error {
n, err = execIO(o, func(o *operation) error {
return syscall.WSARecv(o.fd.Sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil)
})
if race.Enabled {
@ -655,7 +578,7 @@ func (fd *FD) ReadFrom(buf []byte) (int, syscall.Sockaddr, error) {
defer fd.readUnlock()
o := &fd.rop
o.InitBuf(buf)
n, err := rsrv.ExecIO(o, func(o *operation) error {
n, err := execIO(o, func(o *operation) error {
if o.rsa == nil {
o.rsa = new(syscall.RawSockaddrAny)
}
@ -711,7 +634,7 @@ func (fd *FD) Write(buf []byte) (int, error) {
}
o := &fd.wop
o.InitBuf(b)
n, err = wsrv.ExecIO(o, func(o *operation) error {
n, err = execIO(o, func(o *operation) error {
return syscall.WSASend(o.fd.Sysfd, &o.buf, 1, &o.qty, 0, &o.o, nil)
})
}
@ -820,7 +743,7 @@ func (fd *FD) Writev(buf *[][]byte) (int64, error) {
}
o := &fd.wop
o.InitBufs(buf)
n, err := wsrv.ExecIO(o, func(o *operation) error {
n, err := execIO(o, func(o *operation) error {
return syscall.WSASend(o.fd.Sysfd, &o.bufs[0], uint32(len(o.bufs)), &o.qty, 0, &o.o, nil)
})
o.ClearBufs()
@ -841,7 +764,7 @@ func (fd *FD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
o := &fd.wop
o.InitBuf(buf)
o.sa = sa
n, err := wsrv.ExecIO(o, func(o *operation) error {
n, err := execIO(o, func(o *operation) error {
return syscall.WSASendto(o.fd.Sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil)
})
return n, err
@ -856,7 +779,7 @@ func (fd *FD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
o := &fd.wop
o.InitBuf(b)
o.sa = sa
n, err := wsrv.ExecIO(o, func(o *operation) error {
n, err := execIO(o, func(o *operation) error {
return syscall.WSASendto(o.fd.Sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil)
})
ntotal += int(n)
@ -874,7 +797,7 @@ func (fd *FD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
func (fd *FD) ConnectEx(ra syscall.Sockaddr) error {
o := &fd.wop
o.sa = ra
_, err := wsrv.ExecIO(o, func(o *operation) error {
_, err := execIO(o, func(o *operation) error {
return ConnectExFunc(o.fd.Sysfd, o.sa, nil, 0, nil, &o.o)
})
return err
@ -884,7 +807,7 @@ func (fd *FD) acceptOne(s syscall.Handle, rawsa []syscall.RawSockaddrAny, o *ope
// Submit accept request.
o.handle = s
o.rsan = int32(unsafe.Sizeof(rawsa[0]))
_, err := rsrv.ExecIO(o, func(o *operation) error {
_, err := execIO(o, func(o *operation) error {
return AcceptFunc(o.fd.Sysfd, o.handle, (*byte)(unsafe.Pointer(&rawsa[0])), 0, uint32(o.rsan), uint32(o.rsan), &o.qty, &o.o)
})
if err != nil {
@ -1008,7 +931,7 @@ func (fd *FD) RawRead(f func(uintptr) bool) error {
if !fd.IsStream {
o.flags |= windows.MSG_PEEK
}
_, err := rsrv.ExecIO(o, func(o *operation) error {
_, err := execIO(o, func(o *operation) error {
return syscall.WSARecv(o.fd.Sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil)
})
if err == windows.WSAEMSGSIZE {
@ -1078,7 +1001,7 @@ func (fd *FD) ReadMsg(p []byte, oob []byte) (int, int, int, syscall.Sockaddr, er
o.rsa = new(syscall.RawSockaddrAny)
o.msg.Name = o.rsa
o.msg.Namelen = int32(unsafe.Sizeof(*o.rsa))
n, err := rsrv.ExecIO(o, func(o *operation) error {
n, err := execIO(o, func(o *operation) error {
return windows.WSARecvMsg(o.fd.Sysfd, &o.msg, &o.qty, &o.o, nil)
})
err = fd.eofError(n, err)
@ -1110,7 +1033,7 @@ func (fd *FD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (int, int, err
o.msg.Name = (*syscall.RawSockaddrAny)(rsa)
o.msg.Namelen = len
}
n, err := wsrv.ExecIO(o, func(o *operation) error {
n, err := execIO(o, func(o *operation) error {
return windows.WSASendMsg(o.fd.Sysfd, &o.msg, 0, &o.qty, &o.o, nil)
})
return n, int(o.msg.Control.Len), err

View File

@ -57,7 +57,7 @@ func SendFile(fd *FD, src syscall.Handle, n int64) (written int64, err error) {
o.o.Offset = uint32(curpos)
o.o.OffsetHigh = uint32(curpos >> 32)
nw, err := wsrv.ExecIO(o, func(o *operation) error {
nw, err := execIO(o, func(o *operation) error {
return syscall.TransmitFile(o.fd.Sysfd, o.handle, o.qty, 0, &o.o, nil, syscall.TF_WRITE_BEHIND)
})
if err != nil {