From adb384ad2cfbd742fb106b7ec2a65d4ad844c35e Mon Sep 17 00:00:00 2001 From: David du Colombier <0intro@gmail.com> Date: Fri, 11 Nov 2016 20:49:11 +0100 Subject: [PATCH] net: implement asynchonous cancelable I/O on Plan 9 This change is an experimental implementation of asynchronous cancelable I/O operations on Plan 9, which are required to implement deadlines. There are no asynchronous syscalls on Plan 9. I/O operations are performed with blocking pread and pwrite syscalls. Implementing deadlines in Go requires a way to interrupt I/O operations. It is possible to interrupt reads and writes on a TCP connection by forcing the closure of the TCP connection. This approach has been used successfully in CL 31390. However, we can't implement deadlines with this method, since we require to be able to reuse the connection after the timeout. On Plan 9, I/O operations are interrupted when the process receives a note. We can rely on this behavior to implement a more generic approach. When doing an I/O operation (read or write), we start the I/O in its own process, then wait for the result asynchronously. The process is able to handle the "hangup" note. When receiving the "hangup" note, the currently running I/O operation is canceled and the process returns. This way, deadlines can be implemented by sending an "hangup" note to the process running the blocking I/O operation, after the expiration of a timer. Fixes #11932. Fixes #17498. Change-Id: I414f72c7a9a4f9b8f9c09ed3b6c269f899d9b430 Reviewed-on: https://go-review.googlesource.com/31521 Reviewed-by: Brad Fitzpatrick --- src/net/fd_io_plan9.go | 93 ++++++++++++++++++++++++++++++++++ src/net/fd_plan9.go | 105 +++++++++++++++++++++++++++++++++++++-- src/net/tcpsock_test.go | 5 ++ src/runtime/net_plan9.go | 29 +++++++++++ src/runtime/os3_plan9.go | 3 ++ src/runtime/os_plan9.go | 1 + 6 files changed, 231 insertions(+), 5 deletions(-) create mode 100644 src/net/fd_io_plan9.go create mode 100644 src/runtime/net_plan9.go diff --git a/src/net/fd_io_plan9.go b/src/net/fd_io_plan9.go new file mode 100644 index 00000000000..76da0c546cf --- /dev/null +++ b/src/net/fd_io_plan9.go @@ -0,0 +1,93 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package net + +import ( + "os" + "runtime" + "sync" + "syscall" +) + +// asyncIO implements asynchronous cancelable I/O. +// An asyncIO represents a single asynchronous Read or Write +// operation. The result is returned on the result channel. +// The undergoing I/O system call can either complete or be +// interrupted by a note. +type asyncIO struct { + res chan result + + // mu guards the pid field. + mu sync.Mutex + + // pid holds the process id of + // the process running the IO operation. + pid int +} + +// result is the return value of a Read or Write operation. +type result struct { + n int + err error +} + +// newAsyncIO returns a new asyncIO that performs an I/O +// operation by calling fn, which must do one and only one +// interruptible system call. +func newAsyncIO(fn func([]byte) (int, error), b []byte) *asyncIO { + aio := &asyncIO{ + res: make(chan result, 0), + } + aio.mu.Lock() + go func() { + // Lock the current goroutine to its process + // and store the pid in io so that Cancel can + // interrupt it. We ignore the "hangup" signal, + // so the signal does not take down the entire + // Go runtime. + runtime.LockOSThread() + runtime_ignoreHangup() + aio.pid = os.Getpid() + aio.mu.Unlock() + + n, err := fn(b) + + aio.mu.Lock() + aio.pid = -1 + runtime_unignoreHangup() + aio.mu.Unlock() + + aio.res <- result{n, err} + }() + return aio +} + +var hangupNote os.Signal = syscall.Note("hangup") + +// Cancel interrupts the I/O operation, causing +// the Wait function to return. +func (aio *asyncIO) Cancel() { + aio.mu.Lock() + defer aio.mu.Unlock() + if aio.pid == -1 { + return + } + proc, err := os.FindProcess(aio.pid) + if err != nil { + return + } + proc.Signal(hangupNote) +} + +// Wait for the I/O operation to complete. +func (aio *asyncIO) Wait() (int, error) { + res := <-aio.res + return res.n, res.err +} + +// The following functions, provided by the runtime, are used to +// ignore and unignore the "hangup" signal received by the process. +func runtime_ignoreHangup() +func runtime_unignoreHangup() diff --git a/src/net/fd_plan9.go b/src/net/fd_plan9.go index ab5db38dbec..300d8c4543e 100644 --- a/src/net/fd_plan9.go +++ b/src/net/fd_plan9.go @@ -7,10 +7,17 @@ package net import ( "io" "os" + "sync/atomic" "syscall" "time" ) +type atomicBool int32 + +func (b *atomicBool) isSet() bool { return atomic.LoadInt32((*int32)(b)) != 0 } +func (b *atomicBool) setFalse() { atomic.StoreInt32((*int32)(b), 0) } +func (b *atomicBool) setTrue() { atomic.StoreInt32((*int32)(b), 1) } + // Network file descriptor. type netFD struct { // locking/lifetime of sysfd + serialize access to Read and Write methods @@ -23,6 +30,14 @@ type netFD struct { listen, ctl, data *os.File laddr, raddr Addr isStream bool + + // deadlines + raio *asyncIO + waio *asyncIO + rtimer *time.Timer + wtimer *time.Timer + rtimedout atomicBool // set true when read deadline has been reached + wtimedout atomicBool // set true when write deadline has been reached } var ( @@ -84,6 +99,9 @@ func (fd *netFD) destroy() { } func (fd *netFD) Read(b []byte) (n int, err error) { + if fd.rtimedout.isSet() { + return 0, errTimeout + } if !fd.ok() || fd.data == nil { return 0, syscall.EINVAL } @@ -94,10 +112,15 @@ func (fd *netFD) Read(b []byte) (n int, err error) { if len(b) == 0 { return 0, nil } - n, err = fd.data.Read(b) + fd.raio = newAsyncIO(fd.data.Read, b) + n, err = fd.raio.Wait() + fd.raio = nil if isHangup(err) { err = io.EOF } + if isInterrupted(err) { + err = errTimeout + } if fd.net == "udp" && err == io.EOF { n = 0 err = nil @@ -106,6 +129,9 @@ func (fd *netFD) Read(b []byte) (n int, err error) { } func (fd *netFD) Write(b []byte) (n int, err error) { + if fd.wtimedout.isSet() { + return 0, errTimeout + } if !fd.ok() || fd.data == nil { return 0, syscall.EINVAL } @@ -113,7 +139,13 @@ func (fd *netFD) Write(b []byte) (n int, err error) { return 0, err } defer fd.writeUnlock() - return fd.data.Write(b) + fd.waio = newAsyncIO(fd.data.Write, b) + n, err = fd.waio.Wait() + fd.waio = nil + if isInterrupted(err) { + err = errTimeout + } + return } func (fd *netFD) closeRead() error { @@ -185,15 +217,74 @@ func (fd *netFD) file(f *os.File, s string) (*os.File, error) { } func (fd *netFD) setDeadline(t time.Time) error { - return syscall.EPLAN9 + return setDeadlineImpl(fd, t, 'r'+'w') } func (fd *netFD) setReadDeadline(t time.Time) error { - return syscall.EPLAN9 + return setDeadlineImpl(fd, t, 'r') } func (fd *netFD) setWriteDeadline(t time.Time) error { - return syscall.EPLAN9 + return setDeadlineImpl(fd, t, 'w') +} + +func setDeadlineImpl(fd *netFD, t time.Time, mode int) error { + d := t.Sub(time.Now()) + if mode == 'r' || mode == 'r'+'w' { + fd.rtimedout.setFalse() + } + if mode == 'w' || mode == 'r'+'w' { + fd.wtimedout.setFalse() + } + if t.IsZero() || d < 0 { + // Stop timer + if mode == 'r' || mode == 'r'+'w' { + if fd.rtimer != nil { + fd.rtimer.Stop() + } + fd.rtimer = nil + } + if mode == 'w' || mode == 'r'+'w' { + if fd.wtimer != nil { + fd.wtimer.Stop() + } + fd.wtimer = nil + } + } else { + // Interrupt I/O operation once timer has expired + if mode == 'r' || mode == 'r'+'w' { + fd.rtimer = time.AfterFunc(d, func() { + fd.rtimedout.setTrue() + if fd.raio != nil { + fd.raio.Cancel() + } + }) + } + if mode == 'w' || mode == 'r'+'w' { + fd.wtimer = time.AfterFunc(d, func() { + fd.wtimedout.setTrue() + if fd.waio != nil { + fd.waio.Cancel() + } + }) + } + } + if !t.IsZero() && d < 0 { + // Interrupt current I/O operation + if mode == 'r' || mode == 'r'+'w' { + fd.rtimedout.setTrue() + if fd.raio != nil { + fd.raio.Cancel() + } + } + if mode == 'w' || mode == 'r'+'w' { + fd.wtimedout.setTrue() + if fd.waio != nil { + fd.waio.Cancel() + } + } + } + return nil } func setReadBuffer(fd *netFD, bytes int) error { @@ -207,3 +298,7 @@ func setWriteBuffer(fd *netFD, bytes int) error { func isHangup(err error) bool { return err != nil && stringsHasSuffix(err.Error(), "Hangup") } + +func isInterrupted(err error) bool { + return err != nil && stringsHasSuffix(err.Error(), "interrupted") +} diff --git a/src/net/tcpsock_test.go b/src/net/tcpsock_test.go index 8b2d2ca484c..7c8610d32ba 100644 --- a/src/net/tcpsock_test.go +++ b/src/net/tcpsock_test.go @@ -467,6 +467,11 @@ func TestTCPConcurrentAccept(t *testing.T) { func TestTCPReadWriteAllocs(t *testing.T) { switch runtime.GOOS { + case "plan9": + // The implementation of asynchronous cancelable + // I/O on Plan 9 allocates memory. + // See net/fd_io_plan9.go. + t.Skipf("not supported on %s", runtime.GOOS) case "nacl": // NaCl needs to allocate pseudo file descriptor // stuff. See syscall/fd_nacl.go. diff --git a/src/runtime/net_plan9.go b/src/runtime/net_plan9.go new file mode 100644 index 00000000000..10fd089aea3 --- /dev/null +++ b/src/runtime/net_plan9.go @@ -0,0 +1,29 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package runtime + +import ( + _ "unsafe" +) + +//go:linkname runtime_ignoreHangup net.runtime_ignoreHangup +func runtime_ignoreHangup() { + getg().m.ignoreHangup = true +} + +//go:linkname runtime_unignoreHangup net.runtime_unignoreHangup +func runtime_unignoreHangup(sig string) { + getg().m.ignoreHangup = false +} + +func ignoredNote(note *byte) bool { + if note == nil { + return false + } + if gostringnocopy(note) != "hangup" { + return false + } + return getg().m.ignoreHangup +} diff --git a/src/runtime/os3_plan9.go b/src/runtime/os3_plan9.go index aff1d05b258..26b4acd89a3 100644 --- a/src/runtime/os3_plan9.go +++ b/src/runtime/os3_plan9.go @@ -100,6 +100,9 @@ func sighandler(_ureg *ureg, note *byte, gp *g) int { return _NCONT } if flags&_SigNotify != 0 { + if ignoredNote(note) { + return _NCONT + } if sendNote(note) { return _NCONT } diff --git a/src/runtime/os_plan9.go b/src/runtime/os_plan9.go index 032aec1a46c..ba2d5c55258 100644 --- a/src/runtime/os_plan9.go +++ b/src/runtime/os_plan9.go @@ -13,6 +13,7 @@ type mOS struct { waitsemacount uint32 notesig *int8 errstr *byte + ignoreHangup bool } func closefd(fd int32) int32