Skip to content

Commit

Permalink
refactor: allow Subscribe::ready to be fallible
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
  • Loading branch information
rvolosatovs committed Dec 20, 2023
1 parent 61e1cdf commit d8dfec4
Show file tree
Hide file tree
Showing 16 changed files with 93 additions and 53 deletions.
19 changes: 11 additions & 8 deletions crates/wasi-http/src/body.rs
Expand Up @@ -207,15 +207,16 @@ impl HostInputStream for HostIncomingBodyStream {

#[async_trait::async_trait]
impl Subscribe for HostIncomingBodyStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
if !self.buffer.is_empty() || self.error.is_some() {
return;
return Ok(());
}

if let IncomingBodyStreamState::Open { body, .. } = &mut self.state {
let frame = body.frame().await;
self.record_frame(frame);
}
Ok(())
}
}

Expand Down Expand Up @@ -306,11 +307,11 @@ pub enum HostFutureTrailers {

#[async_trait::async_trait]
impl Subscribe for HostFutureTrailers {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
let body = match self {
HostFutureTrailers::Waiting(body) => body,
HostFutureTrailers::Done(_) => return,
HostFutureTrailers::Consumed => return,
HostFutureTrailers::Done(_) => return Ok(()),
HostFutureTrailers::Consumed => return Ok(()),
};

// If the body is itself being read by a body stream then we need to
Expand Down Expand Up @@ -339,8 +340,8 @@ impl Subscribe for HostFutureTrailers {
// we have the body ourselves then read frames until trailers are found.
let body = match self {
HostFutureTrailers::Waiting(body) => body,
HostFutureTrailers::Done(_) => return,
HostFutureTrailers::Consumed => return,
HostFutureTrailers::Done(_) => return Ok(()),
HostFutureTrailers::Consumed => return Ok(()),
};
let hyper_body = match &mut body.body {
IncomingBodyState::Start(body) => body,
Expand All @@ -360,6 +361,7 @@ impl Subscribe for HostFutureTrailers {
}
};
*self = HostFutureTrailers::Done(result);
Ok(())
}
}

Expand Down Expand Up @@ -627,10 +629,11 @@ impl HostOutputStream for BodyWriteStream {

#[async_trait::async_trait]
impl Subscribe for BodyWriteStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
// Attempt to perform a reservation for a send. If there's capacity in
// the channel or it's already closed then this will return immediately.
// If the channel is full this will block until capacity opens up.
let _ = self.writer.reserve().await;
Ok(())
}
}
3 changes: 2 additions & 1 deletion crates/wasi-http/src/types.rs
Expand Up @@ -430,9 +430,10 @@ impl HostFutureIncomingResponse {

#[async_trait::async_trait]
impl Subscribe for HostFutureIncomingResponse {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
if let Self::Pending(handle) = self {
*self = Self::Ready(handle.await);
}
Ok(())
}
}
3 changes: 2 additions & 1 deletion crates/wasi/src/preview2/filesystem.rs
Expand Up @@ -285,7 +285,7 @@ impl HostOutputStream for FileOutputStream {

#[async_trait::async_trait]
impl Subscribe for FileOutputStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
if let OutputState::Waiting(task) = &mut self.state {
self.state = match task.await {
Ok(nwritten) => {
Expand All @@ -297,6 +297,7 @@ impl Subscribe for FileOutputStream {
Err(e) => OutputState::Error(e),
};
}
Ok(())
}
}

Expand Down
3 changes: 2 additions & 1 deletion crates/wasi/src/preview2/host/clocks.rs
Expand Up @@ -93,11 +93,12 @@ enum Deadline {

#[async_trait::async_trait]
impl Subscribe for Deadline {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
match self {
Deadline::Past => {}
Deadline::Instant(instant) => tokio::time::sleep_until(*instant).await,
Deadline::Never => std::future::pending().await,
}
Ok(())
}
}
16 changes: 12 additions & 4 deletions crates/wasi/src/preview2/host/io.rs
Expand Up @@ -165,9 +165,17 @@ impl<T: WasiView> streams::HostOutputStream for T {
) -> StreamResult<u64> {
use crate::preview2::Subscribe;

self.table_mut().get_mut(&dest)?.ready().await;
self.table_mut()
.get_mut(&dest)?
.ready()
.await
.map_err(StreamError::Trap)?;

self.table_mut().get_mut(&src)?.ready().await;
self.table_mut()
.get_mut(&src)?
.ready()
.await
.map_err(StreamError::Trap)?;

self.splice(dest, src, len).await
}
Expand Down Expand Up @@ -196,7 +204,7 @@ impl<T: WasiView> streams::HostInputStream for T {
len: u64,
) -> StreamResult<Vec<u8>> {
if let InputStream::Host(s) = self.table_mut().get_mut(&stream)? {
s.ready().await;
s.ready().await.map_err(StreamError::Trap)?;
}
self.read(stream, len).await
}
Expand All @@ -216,7 +224,7 @@ impl<T: WasiView> streams::HostInputStream for T {
len: u64,
) -> StreamResult<u64> {
if let InputStream::Host(s) = self.table_mut().get_mut(&stream)? {
s.ready().await;
s.ready().await.map_err(StreamError::Trap)?;
}
self.skip(stream, len).await
}
Expand Down
6 changes: 4 additions & 2 deletions crates/wasi/src/preview2/host/udp.rs
Expand Up @@ -411,12 +411,13 @@ impl<T: WasiView> udp::HostIncomingDatagramStream for T {

#[async_trait]
impl Subscribe for IncomingDatagramStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
// FIXME: Add `Interest::ERROR` when we update to tokio 1.32.
self.inner
.ready(Interest::READABLE)
.await
.expect("failed to await UDP socket readiness");
Ok(())
}
}

Expand Down Expand Up @@ -545,7 +546,7 @@ impl<T: WasiView> udp::HostOutgoingDatagramStream for T {

#[async_trait]
impl Subscribe for OutgoingDatagramStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
match self.send_state {
SendState::Idle | SendState::Permitted(_) => {}
SendState::Waiting => {
Expand All @@ -557,5 +558,6 @@ impl Subscribe for OutgoingDatagramStream {
self.send_state = SendState::Idle;
}
}
Ok(())
}
}
3 changes: 2 additions & 1 deletion crates/wasi/src/preview2/ip_name_lookup.rs
Expand Up @@ -80,10 +80,11 @@ impl<T: WasiView> HostResolveAddressStream for T {

#[async_trait::async_trait]
impl Subscribe for ResolveAddressStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> Result<()> {
if let ResolveAddressStream::Waiting(future) = self {
*self = ResolveAddressStream::Done(future.await.map(|v| v.into_iter()));
}
Ok(())
}
}

Expand Down
25 changes: 18 additions & 7 deletions crates/wasi/src/preview2/pipe.rs
Expand Up @@ -49,7 +49,9 @@ impl HostInputStream for MemoryInputPipe {

#[async_trait::async_trait]
impl Subscribe for MemoryInputPipe {
async fn ready(&mut self) {}
async fn ready(&mut self) -> wasmtime::Result<()> {
Ok(())
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -104,7 +106,9 @@ impl HostOutputStream for MemoryOutputPipe {

#[async_trait::async_trait]
impl Subscribe for MemoryOutputPipe {
async fn ready(&mut self) {}
async fn ready(&mut self) -> wasmtime::Result<()> {
Ok(())
}
}

/// Provides a [`HostInputStream`] impl from a [`tokio::io::AsyncRead`] impl
Expand Down Expand Up @@ -193,16 +197,17 @@ impl HostInputStream for AsyncReadStream {
}
#[async_trait::async_trait]
impl Subscribe for AsyncReadStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
if self.buffer.is_some() || self.closed {
return;
return Ok(());
}
match self.receiver.recv().await {
Some(res) => self.buffer = Some(res),
None => {
panic!("no more sender for an open AsyncReadStream - should be impossible")
}
}
Ok(())
}
}

Expand All @@ -227,7 +232,9 @@ impl HostOutputStream for SinkOutputStream {

#[async_trait::async_trait]
impl Subscribe for SinkOutputStream {
async fn ready(&mut self) {}
async fn ready(&mut self) -> wasmtime::Result<()> {
Ok(())
}
}

/// A stream that is ready immediately, but will always report that it's closed.
Expand All @@ -243,7 +250,9 @@ impl HostInputStream for ClosedInputStream {

#[async_trait::async_trait]
impl Subscribe for ClosedInputStream {
async fn ready(&mut self) {}
async fn ready(&mut self) -> wasmtime::Result<()> {
Ok(())
}
}

/// An output stream that is always closed.
Expand All @@ -265,7 +274,9 @@ impl HostOutputStream for ClosedOutputStream {

#[async_trait::async_trait]
impl Subscribe for ClosedOutputStream {
async fn ready(&mut self) {}
async fn ready(&mut self) -> wasmtime::Result<()> {
Ok(())
}
}

#[cfg(test)]
Expand Down
26 changes: 13 additions & 13 deletions crates/wasi/src/preview2/poll.rs
Expand Up @@ -7,7 +7,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use wasmtime::component::{Resource, ResourceTable};

pub type PollableFuture<'a> = Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
pub type PollableFuture<'a> = Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
pub type MakeFuture = for<'a> fn(&'a mut dyn Any) -> PollableFuture<'a>;
pub type ClosureFuture = Box<dyn Fn() -> PollableFuture<'static> + Send + Sync + 'static>;

Expand All @@ -25,7 +25,7 @@ pub struct Pollable {

#[async_trait::async_trait]
pub trait Subscribe: Send + Sync + 'static {
async fn ready(&mut self);
async fn ready(&mut self) -> Result<()>;
}

/// Creates a `pollable` resource which is susbcribed to the provided
Expand Down Expand Up @@ -92,29 +92,29 @@ impl<T: WasiView> poll::Host for T {
futures: Vec<(PollableFuture<'a>, Vec<ReadylistIndex>)>,
}
impl<'a> Future for PollList<'a> {
type Output = Vec<u32>;
type Output = Result<Vec<u32>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut any_ready = false;
let mut results = Vec::new();
for (fut, readylist_indicies) in self.futures.iter_mut() {
match fut.as_mut().poll(cx) {
Poll::Ready(()) => {
Poll::Ready(Ok(())) => {
results.extend_from_slice(readylist_indicies);
any_ready = true;
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => {}
}
}
if any_ready {
Poll::Ready(results)
Poll::Ready(Ok(results))
} else {
Poll::Pending
}
}
}

Ok(PollList { futures }.await)
PollList { futures }.await
}
}

Expand All @@ -124,18 +124,18 @@ impl<T: WasiView> crate::preview2::bindings::io::poll::HostPollable for T {
let table = self.table_mut();
let pollable = table.get(&pollable)?;
let ready = (pollable.make_future)(table.get_any_mut(pollable.index)?);
ready.await;
Ok(())
ready.await
}
async fn ready(&mut self, pollable: Resource<Pollable>) -> Result<bool> {
let table = self.table_mut();
let pollable = table.get(&pollable)?;
let ready = (pollable.make_future)(table.get_any_mut(pollable.index)?);
futures::pin_mut!(ready);
Ok(matches!(
futures::future::poll_immediate(ready).await,
Some(())
))
match futures::future::poll_immediate(ready).await {
None => Ok(false),
Some(Err(err)) => Err(err),
Some(..) => Ok(true),
}
}
fn drop(&mut self, pollable: Resource<Pollable>) -> Result<()> {
let pollable = self.table_mut().delete(pollable)?;
Expand Down
4 changes: 3 additions & 1 deletion crates/wasi/src/preview2/stdio.rs
Expand Up @@ -180,7 +180,9 @@ impl HostOutputStream for OutputStream {

#[async_trait::async_trait]
impl Subscribe for OutputStream {
async fn ready(&mut self) {}
async fn ready(&mut self) -> wasmtime::Result<()> {
Ok(())
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down
5 changes: 3 additions & 2 deletions crates/wasi/src/preview2/stdio/worker_thread_stdin.rs
Expand Up @@ -149,7 +149,7 @@ impl HostInputStream for Stdin {

#[async_trait::async_trait]
impl Subscribe for Stdin {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
let g = GlobalStdin::get();

// Scope the synchronous `state.lock()` to this block which does not
Expand All @@ -164,10 +164,11 @@ impl Subscribe for Stdin {
g.read_completed.notified()
}
StdinState::ReadRequested => g.read_completed.notified(),
StdinState::Data(_) | StdinState::Closed | StdinState::Error(_) => return,
StdinState::Data(_) | StdinState::Closed | StdinState::Error(_) => return Ok(()),
}
};

notified.await;
Ok(())
}
}

0 comments on commit d8dfec4

Please sign in to comment.