mirror of
https://github.com/golang/go
synced 2024-11-11 18:21:40 -07:00
runtime: implement wasip1 netpoll
Implements netpoll using WASI's poll_oneoff system call. This enables non-blocking I/O support for wasip1. Change-Id: Ie395fa49d651c8b8262d485e2847dd65b0a10bc6 Reviewed-on: https://go-review.googlesource.com/c/go/+/493357 Reviewed-by: Dmitri Shuralyov <dmitshur@google.com> Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org> TryBot-Result: Gopher Robot <gobot@golang.org> Reviewed-by: Matthew Dempsky <mdempsky@google.com> Reviewed-by: Johan Brandhorst-Satzkorn <johan.brandhorst@gmail.com> Reviewed-by: Julien Fabre <ju.pryz@gmail.com> Auto-Submit: Johan Brandhorst-Satzkorn <johan.brandhorst@gmail.com> Run-TryBot: Ian Lance Taylor <iant@golang.org>
This commit is contained in:
parent
04c628935d
commit
c5c2184538
@ -5,10 +5,10 @@
|
||||
|
||||
case "$GOWASIRUNTIME" in
|
||||
"wasmedge")
|
||||
exec wasmedge --dir=/ --env PWD="$PWD" "$1" "${@:2}"
|
||||
exec wasmedge --dir=/ --env PWD="$PWD" ${GOWASIRUNTIMEARGS:-} "$1" "${@:2}"
|
||||
;;
|
||||
"wasmer")
|
||||
exec wasmer run --dir=/ --env PWD="$PWD" "$1" -- "${@:2}"
|
||||
exec wasmer run --dir=/ --env PWD="$PWD" ${GOWASIRUNTIMEARGS:-} "$1" -- "${@:2}"
|
||||
;;
|
||||
"wasmtime")
|
||||
exec wasmtime run --dir=/ --env PWD="$PWD" --max-wasm-stack 1048576 "$1" -- "${@:2}"
|
||||
|
13
src/cmd/dist/test.go
vendored
13
src/cmd/dist/test.go
vendored
@ -574,7 +574,8 @@ func (t *tester) registerTests() {
|
||||
// registerStdTestSpecially tracks import paths in the standard library
|
||||
// whose test registration happens in a special way.
|
||||
registerStdTestSpecially := map[string]bool{
|
||||
"cmd/internal/testdir": true, // Registered at the bottom with sharding.
|
||||
"runtime/internal/wasitest": true, // Registered at the bottom as a host test.
|
||||
"cmd/internal/testdir": true, // Registered at the bottom with sharding.
|
||||
}
|
||||
|
||||
// Fast path to avoid the ~1 second of `go list std cmd` when
|
||||
@ -786,6 +787,16 @@ func (t *tester) registerTests() {
|
||||
t.registerCgoTests(cgoHeading)
|
||||
}
|
||||
|
||||
if goos == "wasip1" {
|
||||
t.registerTest("wasip1 host tests",
|
||||
&goTest{
|
||||
variant: "host",
|
||||
pkg: "runtime/internal/wasitest",
|
||||
timeout: 1 * time.Minute,
|
||||
runOnHost: true,
|
||||
})
|
||||
}
|
||||
|
||||
if goos != "android" && !t.iOS() {
|
||||
// Only start multiple test dir shards on builders,
|
||||
// where they get distributed to multiple machines.
|
||||
|
@ -2,7 +2,7 @@
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
//go:build unix
|
||||
//go:build unix || wasip1
|
||||
|
||||
package poll
|
||||
|
||||
|
@ -2,7 +2,7 @@
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
//go:build (js && wasm) || wasip1
|
||||
//go:build js && wasm
|
||||
|
||||
package poll
|
||||
|
@ -2,7 +2,7 @@
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
//go:build unix || windows
|
||||
//go:build unix || windows || wasip1
|
||||
|
||||
package poll
|
||||
|
||||
|
14
src/runtime/internal/wasitest/host_test.go
Normal file
14
src/runtime/internal/wasitest/host_test.go
Normal file
@ -0,0 +1,14 @@
|
||||
// Copyright 2023 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package wasi_test
|
||||
|
||||
import "flag"
|
||||
|
||||
var target string
|
||||
|
||||
func init() {
|
||||
// The dist test runner passes -target when running this as a host test.
|
||||
flag.StringVar(&target, "target", "", "")
|
||||
}
|
96
src/runtime/internal/wasitest/nonblock_test.go
Normal file
96
src/runtime/internal/wasitest/nonblock_test.go
Normal file
@ -0,0 +1,96 @@
|
||||
// Copyright 2023 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package wasi_test
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"syscall"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// This test creates a set of FIFOs and writes to them in reverse order. It
|
||||
// checks that the output order matches the write order. The test binary opens
|
||||
// the FIFOs in their original order and spawns a goroutine for each that reads
|
||||
// from the FIFO and writes the result to stderr. If I/O was blocking, all
|
||||
// goroutines would be blocked waiting for one read call to return, and the
|
||||
// output order wouldn't match.
|
||||
|
||||
type fifo struct {
|
||||
file *os.File
|
||||
path string
|
||||
}
|
||||
|
||||
func TestNonblock(t *testing.T) {
|
||||
if target != "wasip1/wasm" {
|
||||
t.Skip()
|
||||
}
|
||||
|
||||
switch os.Getenv("GOWASIRUNTIME") {
|
||||
case "wazero", "":
|
||||
t.Skip("wazero does not support non-blocking I/O")
|
||||
case "wasmer":
|
||||
t.Skip("wasmer does not support non-blocking I/O")
|
||||
}
|
||||
|
||||
args := []string{"run", "./testdata/nonblock.go"}
|
||||
|
||||
fifos := make([]*fifo, 8)
|
||||
for i := range fifos {
|
||||
path := filepath.Join(t.TempDir(), fmt.Sprintf("wasip1-nonblock-fifo-%d-%d", rand.Uint32(), i))
|
||||
if err := syscall.Mkfifo(path, 0666); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
file, err := os.OpenFile(path, os.O_RDWR, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
args = append(args, path)
|
||||
fifos[len(fifos)-i-1] = &fifo{file, path}
|
||||
}
|
||||
|
||||
subProcess := exec.Command("go", args...)
|
||||
|
||||
subProcess.Env = append(os.Environ(), "GOOS=wasip1", "GOARCH=wasm")
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
defer pw.Close()
|
||||
|
||||
subProcess.Stderr = pw
|
||||
|
||||
if err := subProcess.Start(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
scanner := bufio.NewScanner(pr)
|
||||
if !scanner.Scan() {
|
||||
t.Fatal("expected line:", scanner.Err())
|
||||
} else if scanner.Text() != "waiting" {
|
||||
t.Fatal("unexpected output:", scanner.Text())
|
||||
}
|
||||
|
||||
for _, fifo := range fifos {
|
||||
if _, err := fifo.file.WriteString(fifo.path + "\n"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !scanner.Scan() {
|
||||
t.Fatal("expected line:", scanner.Err())
|
||||
} else if scanner.Text() != fifo.path {
|
||||
t.Fatal("unexpected line:", scanner.Text())
|
||||
}
|
||||
}
|
||||
|
||||
if err := subProcess.Wait(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
48
src/runtime/internal/wasitest/testdata/nonblock.go
vendored
Normal file
48
src/runtime/internal/wasitest/testdata/nonblock.go
vendored
Normal file
@ -0,0 +1,48 @@
|
||||
// Copyright 2023 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
"sync"
|
||||
)
|
||||
|
||||
func main() {
|
||||
ready := make(chan struct{})
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for _, path := range os.Args[1:] {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
spawnWait := make(chan struct{})
|
||||
|
||||
wg.Add(1)
|
||||
go func(f *os.File) {
|
||||
defer f.Close()
|
||||
defer wg.Done()
|
||||
|
||||
close(spawnWait)
|
||||
|
||||
<-ready
|
||||
|
||||
var buf [256]byte
|
||||
n, err := f.Read(buf[:])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
os.Stderr.Write(buf[:n])
|
||||
}(f)
|
||||
|
||||
// Spawn one goroutine at a time.
|
||||
<-spawnWait
|
||||
}
|
||||
|
||||
println("waiting")
|
||||
close(ready)
|
||||
wg.Wait()
|
||||
}
|
@ -336,8 +336,8 @@ func poll_runtime_pollWait(pd *pollDesc, mode int) int {
|
||||
if errcode != pollNoError {
|
||||
return errcode
|
||||
}
|
||||
// As for now only Solaris, illumos, and AIX use level-triggered IO.
|
||||
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
|
||||
// As for now only Solaris, illumos, AIX and wasip1 use level-triggered IO.
|
||||
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" || GOOS == "wasip1" {
|
||||
netpollarm(pd, mode)
|
||||
}
|
||||
for !netpollblock(pd, int32(mode), false) {
|
||||
|
@ -2,10 +2,10 @@
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Fake network poller for js/wasm and wasip1/wasm.
|
||||
// Should never be used, because wasm network connections do not honor "SetNonblock".
|
||||
// Fake network poller for js/wasm.
|
||||
// Should never be used, because js/wasm network connections do not honor "SetNonblock".
|
||||
|
||||
//go:build (js && wasm) || wasip1
|
||||
//go:build js && wasm
|
||||
|
||||
package runtime
|
||||
|
||||
|
254
src/runtime/netpoll_wasip1.go
Normal file
254
src/runtime/netpoll_wasip1.go
Normal file
@ -0,0 +1,254 @@
|
||||
// Copyright 2023 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
//go:build wasip1
|
||||
|
||||
package runtime
|
||||
|
||||
import "unsafe"
|
||||
|
||||
// WASI network poller.
|
||||
//
|
||||
// WASI preview 1 includes a poll_oneoff host function that behaves similarly
|
||||
// to poll(2) on Linux. Like poll(2), poll_oneoff is level triggered. It
|
||||
// accepts one or more subscriptions to FD read or write events.
|
||||
//
|
||||
// Major differences to poll(2):
|
||||
// - the events are not written to the input entries (like pollfd.revents), and
|
||||
// instead are appended to a separate events buffer. poll_oneoff writes zero
|
||||
// or more events to the buffer (at most one per input subscription) and
|
||||
// returns the number of events written. Although the index of the
|
||||
// subscriptions might not match the index of the associated event in the
|
||||
// events buffer, both the subscription and event structs contain a userdata
|
||||
// field and when a subscription yields an event the userdata fields will
|
||||
// match.
|
||||
// - there's no explicit timeout parameter, although a time limit can be added
|
||||
// by using "clock" subscriptions.
|
||||
// - each FD subscription can either be for a read or a write, but not both.
|
||||
// This is in contrast to poll(2) which accepts a mask with POLLIN and
|
||||
// POLLOUT bits, allowing for a subscription to either, neither, or both
|
||||
// reads and writes.
|
||||
//
|
||||
// Since poll_oneoff is similar to poll(2), the implementation here was derived
|
||||
// from netpoll_aix.go.
|
||||
|
||||
const _EINTR = 27
|
||||
|
||||
var (
|
||||
evts []event
|
||||
subs []subscription
|
||||
pds []*pollDesc
|
||||
mtx mutex
|
||||
)
|
||||
|
||||
func netpollinit() {
|
||||
// Unlike poll(2), WASI's poll_oneoff doesn't accept a timeout directly. To
|
||||
// prevent it from blocking indefinitely, a clock subscription with a
|
||||
// timeout field needs to be submitted. Reserve a slot here for the clock
|
||||
// subscription, and set fields that won't change between poll_oneoff calls.
|
||||
|
||||
subs = make([]subscription, 1, 128)
|
||||
evts = make([]event, 0, 128)
|
||||
pds = make([]*pollDesc, 0, 128)
|
||||
|
||||
timeout := &subs[0]
|
||||
eventtype := timeout.u.eventtype()
|
||||
*eventtype = eventtypeClock
|
||||
clock := timeout.u.subscriptionClock()
|
||||
clock.id = clockMonotonic
|
||||
clock.precision = 1e3
|
||||
}
|
||||
|
||||
func netpollIsPollDescriptor(fd uintptr) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func netpollopen(fd uintptr, pd *pollDesc) int32 {
|
||||
lock(&mtx)
|
||||
|
||||
// We don't worry about pd.fdseq here,
|
||||
// as mtx protects us from stale pollDescs.
|
||||
|
||||
pds = append(pds, pd)
|
||||
|
||||
// The 32-bit pd.user field holds the index of the read subscription in the
|
||||
// upper 16 bits, and index of the write subscription in the lower bits.
|
||||
// A disarmed=^uint16(0) sentinel is used to represent no subscription.
|
||||
// There is thus a maximum of 65535 total subscriptions.
|
||||
pd.user = uint32(disarmed)<<16 | uint32(disarmed)
|
||||
|
||||
unlock(&mtx)
|
||||
return 0
|
||||
}
|
||||
|
||||
const disarmed = 0xFFFF
|
||||
|
||||
func netpollarm(pd *pollDesc, mode int) {
|
||||
lock(&mtx)
|
||||
|
||||
var s subscription
|
||||
|
||||
s.userdata = userdata(uintptr(unsafe.Pointer(pd)))
|
||||
|
||||
fdReadwrite := s.u.subscriptionFdReadwrite()
|
||||
fdReadwrite.fd = int32(pd.fd)
|
||||
|
||||
ridx := int(pd.user >> 16)
|
||||
widx := int(pd.user & 0xFFFF)
|
||||
|
||||
if (mode == 'r' && ridx != disarmed) || (mode == 'w' && widx != disarmed) {
|
||||
unlock(&mtx)
|
||||
return
|
||||
}
|
||||
|
||||
eventtype := s.u.eventtype()
|
||||
switch mode {
|
||||
case 'r':
|
||||
*eventtype = eventtypeFdRead
|
||||
ridx = len(subs)
|
||||
case 'w':
|
||||
*eventtype = eventtypeFdWrite
|
||||
widx = len(subs)
|
||||
}
|
||||
|
||||
if len(subs) == disarmed {
|
||||
throw("overflow")
|
||||
}
|
||||
|
||||
pd.user = uint32(ridx)<<16 | uint32(widx)
|
||||
|
||||
subs = append(subs, s)
|
||||
evts = append(evts, event{})
|
||||
|
||||
unlock(&mtx)
|
||||
}
|
||||
|
||||
func netpolldisarm(pd *pollDesc, mode int32) {
|
||||
switch mode {
|
||||
case 'r':
|
||||
removesub(int(pd.user >> 16))
|
||||
case 'w':
|
||||
removesub(int(pd.user & 0xFFFF))
|
||||
case 'r' + 'w':
|
||||
removesub(int(pd.user >> 16))
|
||||
removesub(int(pd.user & 0xFFFF))
|
||||
}
|
||||
}
|
||||
|
||||
func removesub(i int) {
|
||||
if i == disarmed {
|
||||
return
|
||||
}
|
||||
j := len(subs) - 1
|
||||
|
||||
pdi := (*pollDesc)(unsafe.Pointer(uintptr(subs[i].userdata)))
|
||||
pdj := (*pollDesc)(unsafe.Pointer(uintptr(subs[j].userdata)))
|
||||
|
||||
swapsub(pdi, i, disarmed)
|
||||
swapsub(pdj, j, i)
|
||||
|
||||
subs = subs[:j]
|
||||
}
|
||||
|
||||
func swapsub(pd *pollDesc, from, to int) {
|
||||
if from == to {
|
||||
return
|
||||
}
|
||||
ridx := int(pd.user >> 16)
|
||||
widx := int(pd.user & 0xFFFF)
|
||||
if ridx == from {
|
||||
ridx = to
|
||||
} else if widx == from {
|
||||
widx = to
|
||||
}
|
||||
pd.user = uint32(ridx)<<16 | uint32(widx)
|
||||
if to != disarmed {
|
||||
subs[to], subs[from] = subs[from], subs[to]
|
||||
}
|
||||
}
|
||||
|
||||
func netpollclose(fd uintptr) int32 {
|
||||
lock(&mtx)
|
||||
for i := 0; i < len(pds); i++ {
|
||||
if pds[i].fd == fd {
|
||||
netpolldisarm(pds[i], 'r'+'w')
|
||||
pds[i] = pds[len(pds)-1]
|
||||
pds = pds[:len(pds)-1]
|
||||
break
|
||||
}
|
||||
}
|
||||
unlock(&mtx)
|
||||
return 0
|
||||
}
|
||||
|
||||
func netpollBreak() {}
|
||||
|
||||
func netpoll(delay int64) gList {
|
||||
lock(&mtx)
|
||||
|
||||
// If delay >= 0, we include a subscription of type Clock that we use as
|
||||
// a timeout. If delay < 0, we omit the subscription and allow poll_oneoff
|
||||
// to block indefinitely.
|
||||
pollsubs := subs
|
||||
if delay >= 0 {
|
||||
timeout := &subs[0]
|
||||
clock := timeout.u.subscriptionClock()
|
||||
clock.timeout = uint64(delay)
|
||||
} else {
|
||||
pollsubs = subs[1:]
|
||||
}
|
||||
|
||||
if len(pollsubs) == 0 {
|
||||
unlock(&mtx)
|
||||
return gList{}
|
||||
}
|
||||
|
||||
evts = evts[:len(pollsubs)]
|
||||
for i := range evts {
|
||||
evts[i] = event{}
|
||||
}
|
||||
|
||||
retry:
|
||||
var nevents size
|
||||
errno := poll_oneoff(unsafe.Pointer(&pollsubs[0]), unsafe.Pointer(&evts[0]), uint32(len(pollsubs)), unsafe.Pointer(&nevents))
|
||||
if errno != 0 {
|
||||
if errno != _EINTR {
|
||||
println("errno=", errno, " len(pollsubs)=", len(pollsubs))
|
||||
throw("poll_oneoff failed")
|
||||
}
|
||||
// If a timed sleep was interrupted, just return to
|
||||
// recalculate how long we should sleep now.
|
||||
if delay > 0 {
|
||||
unlock(&mtx)
|
||||
return gList{}
|
||||
}
|
||||
goto retry
|
||||
}
|
||||
|
||||
var toRun gList
|
||||
for i := 0; i < int(nevents); i++ {
|
||||
e := &evts[i]
|
||||
if e.typ == eventtypeClock {
|
||||
continue
|
||||
}
|
||||
|
||||
hangup := e.fdReadwrite.flags&fdReadwriteHangup != 0
|
||||
var mode int32
|
||||
if e.typ == eventtypeFdRead || e.error != 0 || hangup {
|
||||
mode += 'r'
|
||||
}
|
||||
if e.typ == eventtypeFdWrite || e.error != 0 || hangup {
|
||||
mode += 'w'
|
||||
}
|
||||
if mode != 0 {
|
||||
pd := (*pollDesc)(unsafe.Pointer(uintptr(e.userdata)))
|
||||
netpolldisarm(pd, mode)
|
||||
pd.setEventErr(e.error != 0, 0)
|
||||
netpollready(&toRun, pd, mode)
|
||||
}
|
||||
}
|
||||
|
||||
unlock(&mtx)
|
||||
return toRun
|
||||
}
|
@ -123,6 +123,10 @@ type subscriptionClock struct {
|
||||
flags subclockflags
|
||||
}
|
||||
|
||||
type subscriptionFdReadwrite struct {
|
||||
fd int32
|
||||
}
|
||||
|
||||
type subscription struct {
|
||||
userdata userdata
|
||||
u subscriptionUnion
|
||||
@ -138,6 +142,10 @@ func (u *subscriptionUnion) subscriptionClock() *subscriptionClock {
|
||||
return (*subscriptionClock)(unsafe.Pointer(&u[1]))
|
||||
}
|
||||
|
||||
func (u *subscriptionUnion) subscriptionFdReadwrite() *subscriptionFdReadwrite {
|
||||
return (*subscriptionFdReadwrite)(unsafe.Pointer(&u[1]))
|
||||
}
|
||||
|
||||
//go:wasmimport wasi_snapshot_preview1 poll_oneoff
|
||||
//go:noescape
|
||||
func poll_oneoff(in, out unsafe.Pointer, nsubscriptions size, nevents unsafe.Pointer) errno
|
||||
|
Loading…
Reference in New Issue
Block a user