Skip to content

Commit

Permalink
wasi: better poll_oneoff + test + compatibility
Browse files Browse the repository at this point in the history
Signed-off-by: Gaukas Wang <i@gaukas.wang>

wasi: better poll_oneoff

Signed-off-by: Gaukas Wang <i@gaukas.wang>

fix: poll_oneoff test behavior

* Adjust expected event order in a testcase since clock events are now acknowledged outside the loop.
* Always write nevents=0 first before start polling to detect faulty memory early.
* Poll on blocking stdin FdRead.
* Minor nits including if-else criteria/order.

Signed-off-by: Gaukas Wang <i@gaukas.wang>
  • Loading branch information
gaukas committed May 6, 2024
1 parent f67269a commit 919983e
Show file tree
Hide file tree
Showing 12 changed files with 466 additions and 17 deletions.
12 changes: 12 additions & 0 deletions experimental/sys/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,4 +314,16 @@ type File interface {
// - This is like syscall.Close and `close` in POSIX. See
// https://pubs.opengroup.org/onlinepubs/9699919799/functions/close.html
Close() Errno

// [WATER SECTION BEGIN]

// Fd returns the underlying file descriptor owned by the host. If a file
// descriptor does not exist, it returns 0.
//
// This is a useful workaround since we want to collect the file descriptor
// from every file (including sockets and pipes) and pass them into the poll
// syscall in poll_oneoff.
Fd() uintptr

// [WATER SECTION END]
}
9 changes: 9 additions & 0 deletions experimental/sys/unimplemented.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,12 @@ func (UnimplementedFile) Utimens(int64, int64) Errno {

// Close implements File.Close
func (UnimplementedFile) Close() (errno Errno) { return }

// [WATER SECTION BEGIN]

// Fd implements File.Fd
func (UnimplementedFile) Fd() uintptr {
return 0 // not ENOSYS
}

// [WATER SECTION END]
16 changes: 8 additions & 8 deletions imports/wasi_snapshot_preview1/poll_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,18 +376,18 @@ func Test_pollOneoff_Stdin(t *testing.T) {
out: 128, // past in
resultNevents: 512, // past out
expectedMem: []byte{
// Clock is acknowledged first.
0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // userdata
byte(wasip1.ErrnoSuccess), 0x0, // errno is 16 bit
wasip1.EventTypeClock, 0x0, 0x0, 0x0, // 4 bytes for type enum
// First an illegal file with custom user data should be acknowledged.
0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, // userdata
byte(wasip1.ErrnoBadf), 0x0, // errno is 16 bit
wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, // 4 bytes for type enum
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0,

// Then an illegal file with custom user data.
0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, // userdata
byte(wasip1.ErrnoBadf), 0x0, // errno is 16 bit
wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, // 4 bytes for type enum
// Clock is acknowledged then.
0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // userdata
byte(wasip1.ErrnoSuccess), 0x0, // errno is 16 bit
wasip1.EventTypeClock, 0x0, 0x0, 0x0, // 4 bytes for type enum
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0,
Expand Down
252 changes: 252 additions & 0 deletions imports/wasi_snapshot_preview1/w_poll_oneoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
//go:build (linux || darwin || windows) && !tinygo

// Copyright 2024 The WATER Authors. All rights reserved.
// Use of this source code is governed by Apache 2 license
// that can be found in the LICENSE file.

package wasi_snapshot_preview1

import (
"context"
"time"

"github.com/tetratelabs/wazero/api"
"github.com/tetratelabs/wazero/experimental/sys"
"github.com/tetratelabs/wazero/internal/fsapi"
internalsys "github.com/tetratelabs/wazero/internal/sys"
internalsysfs "github.com/tetratelabs/wazero/internal/sysfs"
"github.com/tetratelabs/wazero/internal/wasip1"
"github.com/tetratelabs/wazero/internal/wasm"
)

// use the init function to override the default pollOneoffFn
func init() {
// override the default pollOneoff
pollOneoff = newHostFunc(
wasip1.PollOneoffName, alternativePollOneoffFn,
[]api.ValueType{i32, i32, i32, i32},
"in", "out", "nsubscriptions", "result.nevents",
)
}

// alternativePollOneoffFn is a modified version of pollOneoffFn that
// tries to be more syscall-aligned. It should block and return only when
// there is at least one event triggered.
func alternativePollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno {
in := uint32(params[0])
out := uint32(params[1])
nsubscriptions := uint32(params[2])
resultNevents := uint32(params[3])

if nsubscriptions == 0 {
return sys.EINVAL // early returning on empty subscriptions list
}

mem := mod.Memory()

// Ensure capacity prior to the read loop to reduce error handling.
inBuf, ok := mem.Read(in, nsubscriptions*48)
if !ok {
return sys.EFAULT
}
outBuf, ok := mem.Read(out, nsubscriptions*32)
// zero-out all buffer before writing
for i := range outBuf {
outBuf[i] = 0
}

if !ok {
return sys.EFAULT
}

// start by writing 0 to resultNevents
if !mod.Memory().WriteUint32Le(resultNevents, 0) {
return sys.EFAULT
}

// Extract FS context, used in the body of the for loop for FS access.
fsc := mod.(*wasm.ModuleInstance).Sys.FS()
// Slice of events that are processed out of the loop (blocking stdin subscribers).
var blockingStdinSubs []*event
// The timeout is initialized at max Duration, the loop will find the minimum.
var timeout time.Duration = 1<<63 - 1
// Count of all the subscriptions that have been already written back to outBuf.
// nevents*32 returns at all times the offset where the next event should be written:
// this way we ensure that there are no gaps between records.
var nevents uint32

// Slice of all I/O events that will be written if triggered
var ioEvents []*event

// Slice of hostPollSub that will be used for polling
var hostPollSubs []internalsysfs.PollFd

// The clock event with the minimum timeout, if any.
var clkevent *event

// Layout is subscription_u: Union
// https://github.com/WebAssembly/WASI/blob/snapshot-01/phases/snapshot/docs.md#subscription_u
for i := uint32(0); i < nsubscriptions; i++ {
inOffset := i * 48
outOffset := nevents * 32

eventType := inBuf[inOffset+8] // +8 past userdata
// +8 past userdata +8 contents_offset
argBuf := inBuf[inOffset+8+8:]
userData := inBuf[inOffset : inOffset+8]

evt := &event{
eventType: eventType,
userData: userData,
errno: wasip1.ErrnoSuccess,
}

switch eventType {
case wasip1.EventTypeClock: // handle later
newTimeout, err := processClockEvent(argBuf)
if err != 0 {
return err
}
// Min timeout.
if newTimeout < timeout {
timeout = newTimeout
// overwrite the clock event
clkevent = evt
}
case wasip1.EventTypeFdRead:
guestFd := int32(le.Uint32(argBuf))
if guestFd < 0 {
return sys.EBADF
}

if file, ok := fsc.LookupFile(guestFd); !ok {
evt.errno = wasip1.ErrnoBadf
writeEvent(outBuf[outOffset:], evt)
nevents++
} else if guestFd == internalsys.FdStdin { // stdin is always checked with Poll function later.
if file.File.IsNonblock() { // non-blocking stdin is always ready to read
writeEvent(outBuf[outOffset:], evt)
nevents++
} else {
// if the fd is Stdin, and it is in blocking mode,
// do not ack yet, append to a slice for delayed evaluation.
blockingStdinSubs = append(blockingStdinSubs, evt)
}
} else if hostFd := file.File.Fd(); hostFd == 0 {
evt.errno = wasip1.ErrnoNotsup
writeEvent(outBuf[outOffset:], evt)
nevents++
} else {
ioEvents = append(ioEvents, evt)
hostPollSubs = append(hostPollSubs, internalsysfs.PollFd{
Fd: hostFd,
Events: fsapi.POLLIN,
})
}
case wasip1.EventTypeFdWrite:
guestFd := int32(le.Uint32(argBuf))
if guestFd < 0 {
return sys.EBADF
}

if file, ok := fsc.LookupFile(guestFd); !ok {
evt.errno = wasip1.ErrnoBadf
writeEvent(outBuf[outOffset:], evt)
nevents++
} else if guestFd == internalsys.FdStdout || guestFd == internalsys.FdStderr { // stdout and stderr are always ready to write
writeEvent(outBuf[outOffset:], evt)
nevents++
} else if hostFd := file.File.Fd(); hostFd == 0 {
evt.errno = wasip1.ErrnoNotsup
writeEvent(outBuf[outOffset:], evt)
nevents++
} else {
ioEvents = append(ioEvents, evt)
hostPollSubs = append(hostPollSubs, internalsysfs.PollFd{
Fd: hostFd,
Events: fsapi.POLLOUT,
})
}
default:
return sys.EINVAL
}
}

// We have scanned all the subscriptions, and there are several cases:
// - Clock subscriptions-only: we block until the timeout expires.
// - At least one I/O subscription: we call poll on the I/O fds. Then we check the poll results
// and write back the corresponding events ONLY if the revent in pollFd is properly set.
// - If no clock subscription, we block with max timeout.
// - If there are clock subscriptions, we block with the minimum timeout.

// If there are no I/O subscriptions, we can block until the timeout expires.
sysCtx := mod.(*wasm.ModuleInstance).Sys
if len(ioEvents) == 0 {
if timeout > 0 && clkevent != nil { // there is a clock subscription with a timeout
sysCtx.Nanosleep(int64(timeout))
}
// Ack the clock event if there is one
if clkevent != nil {
writeEvent(outBuf[nevents*32:], clkevent)
nevents++
}
}

// If there are I/O subscriptions, we call poll on the I/O fds with the updated timeout.
if len(hostPollSubs) > 0 {
pollNevents, err := internalsysfs.Poll(hostPollSubs, int32(timeout.Milliseconds()))
if err != 0 {
return err
}

if pollNevents > 0 { // if there are events triggered
// iterate over hostPollSubs and if the revent is set, write back
// the event
for i, pollFd := range hostPollSubs {
if pollFd.Revents&pollFd.Events != 0 {
// write back the event
writeEvent(outBuf[nevents*32:], ioEvents[i])
nevents++
} else if pollFd.Revents != 0 {
// write back the event
writeEvent(outBuf[nevents*32:], ioEvents[i])
nevents++
}
}
} else { // otherwise it means that the timeout expired
// Ack the clock event if there is one (it can also be a default max timeout)
if clkevent != nil {
writeEvent(outBuf[nevents*32:], clkevent)
nevents++
}
}
}

// If there are blocking stdin subscribers, check for data with given timeout.
if len(blockingStdinSubs) > 0 {
stdin, ok := fsc.LookupFile(internalsys.FdStdin)
if !ok {
return sys.EBADF
}

// Wait for the timeout to expire, or for some data to become available on Stdin.
if stdinReady, errno := stdin.File.Poll(fsapi.POLLIN, int32(timeout.Milliseconds())); errno != 0 {
return errno
} else if stdinReady {
// stdin has data ready to for reading, write back all the events
for i := range blockingStdinSubs {
evt := blockingStdinSubs[i]
evt.errno = 0
writeEvent(outBuf[nevents*32:], evt)
nevents++
}
}
}

// write nevents to resultNevents
if !mem.WriteUint32Le(resultNevents, nevents) {
return sys.EFAULT
}

return 0
}
8 changes: 8 additions & 0 deletions internal/fsapi/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,12 @@ const (

// POLLOUT is a write event.
POLLOUT

// [WATER SECTION BEGIN]

// POLLUNKNOWN is an unknown event. It is used to indicate that the
// event is not currently supported but there was an event set.
POLLUNKNOWN

// [WATER SECTION END]
)
11 changes: 11 additions & 0 deletions internal/sys/lazy.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,14 @@ func (d *lazyDir) SetNonblock(bool) experimentalsys.Errno {
func (d *lazyDir) Poll(fsapi.Pflag, int32) (ready bool, errno experimentalsys.Errno) {
return false, experimentalsys.ENOSYS
}

// [WATER SECTION BEGIN]

// Fd implements the same method as documented on fsapi.File
//
// We do not know the file descriptor of a lazyDir, so we return 0.
func (d *lazyDir) Fd() uintptr {
return 0
}

// [WATER SECTION END]
9 changes: 9 additions & 0 deletions internal/sysfs/osfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,15 @@ func (f *osFile) Poll(flag fsapi.Pflag, timeoutMillis int32) (ready bool, errno
return poll(f.fd, flag, timeoutMillis)
}

// [WATER SECTION BEGIN]

// Fd implements the same method as documented on sys.File
func (f *osFile) Fd() uintptr {
return f.fd
}

// [WATER SECTION END]

// Readdir implements File.Readdir. Notably, this uses "Readdir", not
// "ReadDir", from os.File.
func (f *osFile) Readdir(n int) (dirents []experimentalsys.Dirent, errno experimentalsys.Errno) {
Expand Down
1 change: 1 addition & 0 deletions internal/sysfs/poll_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func newPollFd(fd uintptr, events, revents int16) pollFd {

// _POLLIN subscribes a notification when any readable data is available.
const _POLLIN = 0x0001
const _POLLOUT = 0x0004 // [WATER] added _POLLOUT to support subscription to FdWrite events

// _poll implements poll on Darwin via the corresponding libc function.
func _poll(fds []pollFd, timeoutMillis int32) (n int, errno sys.Errno) {
Expand Down
5 changes: 5 additions & 0 deletions internal/sysfs/poll_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@ func newPollFd(fd uintptr, events, revents int16) pollFd {

// _POLLIN subscribes a notification when any readable data is available.
const _POLLIN = 0x0001
const _POLLOUT = 0x0004 // [WATER] added _POLLOUT to support subscription to FdWrite events

// _poll implements poll on Linux via ppoll.
func _poll(fds []pollFd, timeoutMillis int32) (n int, errno sys.Errno) {
var ts syscall.Timespec
if timeoutMillis >= 0 {
ts = syscall.NsecToTimespec(int64(time.Duration(timeoutMillis) * time.Millisecond))
} else { // [WATER] Patched this branch to support negative timeouts for max duration
// just max out the timeout, simply giving a negative timeout will not work
// as it fails with EINVAL
ts = syscall.NsecToTimespec(1<<63 - 1)
}
return ppoll(fds, &ts)
}
Expand Down

0 comments on commit 919983e

Please sign in to comment.