Skip to content

Commit

Permalink
refactor: allow Subscribe::ready to be fallible
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
  • Loading branch information
rvolosatovs committed Dec 21, 2023
1 parent 37300d3 commit fd63bc5
Show file tree
Hide file tree
Showing 17 changed files with 124 additions and 64 deletions.
19 changes: 11 additions & 8 deletions crates/wasi-http/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,16 @@ impl HostInputStream for HostIncomingBodyStream {

#[async_trait::async_trait]
impl Subscribe for HostIncomingBodyStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
if !self.buffer.is_empty() || self.error.is_some() {
return;
return Ok(());
}

if let IncomingBodyStreamState::Open { body, .. } = &mut self.state {
let frame = body.frame().await;
self.record_frame(frame);
}
Ok(())
}
}

Expand Down Expand Up @@ -306,11 +307,11 @@ pub enum HostFutureTrailers {

#[async_trait::async_trait]
impl Subscribe for HostFutureTrailers {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
let body = match self {
HostFutureTrailers::Waiting(body) => body,
HostFutureTrailers::Done(_) => return,
HostFutureTrailers::Consumed => return,
HostFutureTrailers::Done(_) => return Ok(()),
HostFutureTrailers::Consumed => return Ok(()),
};

// If the body is itself being read by a body stream then we need to
Expand Down Expand Up @@ -339,8 +340,8 @@ impl Subscribe for HostFutureTrailers {
// we have the body ourselves then read frames until trailers are found.
let body = match self {
HostFutureTrailers::Waiting(body) => body,
HostFutureTrailers::Done(_) => return,
HostFutureTrailers::Consumed => return,
HostFutureTrailers::Done(_) => return Ok(()),
HostFutureTrailers::Consumed => return Ok(()),
};
let hyper_body = match &mut body.body {
IncomingBodyState::Start(body) => body,
Expand All @@ -360,6 +361,7 @@ impl Subscribe for HostFutureTrailers {
}
};
*self = HostFutureTrailers::Done(result);
Ok(())
}
}

Expand Down Expand Up @@ -627,10 +629,11 @@ impl HostOutputStream for BodyWriteStream {

#[async_trait::async_trait]
impl Subscribe for BodyWriteStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
// Attempt to perform a reservation for a send. If there's capacity in
// the channel or it's already closed then this will return immediately.
// If the channel is full this will block until capacity opens up.
let _ = self.writer.reserve().await;
Ok(())
}
}
3 changes: 2 additions & 1 deletion crates/wasi-http/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,9 +430,10 @@ impl HostFutureIncomingResponse {

#[async_trait::async_trait]
impl Subscribe for HostFutureIncomingResponse {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
if let Self::Pending(handle) = self {
*self = Self::Ready(handle.await);
}
Ok(())
}
}
3 changes: 2 additions & 1 deletion crates/wasi/src/preview2/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ impl HostOutputStream for FileOutputStream {

#[async_trait::async_trait]
impl Subscribe for FileOutputStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
if let OutputState::Waiting(task) = &mut self.state {
self.state = match task.await {
Ok(nwritten) => {
Expand All @@ -297,6 +297,7 @@ impl Subscribe for FileOutputStream {
Err(e) => OutputState::Error(e),
};
}
Ok(())
}
}

Expand Down
3 changes: 2 additions & 1 deletion crates/wasi/src/preview2/host/clocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,12 @@ enum Deadline {

#[async_trait::async_trait]
impl Subscribe for Deadline {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
match self {
Deadline::Past => {}
Deadline::Instant(instant) => tokio::time::sleep_until(*instant).await,
Deadline::Never => std::future::pending().await,
}
Ok(())
}
}
16 changes: 12 additions & 4 deletions crates/wasi/src/preview2/host/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,17 @@ impl<T: WasiView> streams::HostOutputStream for T {
) -> StreamResult<u64> {
use crate::preview2::Subscribe;

self.table_mut().get_mut(&dest)?.ready().await;
self.table_mut()
.get_mut(&dest)?
.ready()
.await
.map_err(StreamError::Trap)?;

self.table_mut().get_mut(&src)?.ready().await;
self.table_mut()
.get_mut(&src)?
.ready()
.await
.map_err(StreamError::Trap)?;

self.splice(dest, src, len).await
}
Expand Down Expand Up @@ -196,7 +204,7 @@ impl<T: WasiView> streams::HostInputStream for T {
len: u64,
) -> StreamResult<Vec<u8>> {
if let InputStream::Host(s) = self.table_mut().get_mut(&stream)? {
s.ready().await;
s.ready().await.map_err(StreamError::Trap)?;
}
self.read(stream, len).await
}
Expand All @@ -216,7 +224,7 @@ impl<T: WasiView> streams::HostInputStream for T {
len: u64,
) -> StreamResult<u64> {
if let InputStream::Host(s) = self.table_mut().get_mut(&stream)? {
s.ready().await;
s.ready().await.map_err(StreamError::Trap)?;
}
self.skip(stream, len).await
}
Expand Down
6 changes: 4 additions & 2 deletions crates/wasi/src/preview2/host/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,12 +411,13 @@ impl<T: WasiView> udp::HostIncomingDatagramStream for T {

#[async_trait]
impl Subscribe for IncomingDatagramStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
// FIXME: Add `Interest::ERROR` when we update to tokio 1.32.
self.inner
.ready(Interest::READABLE)
.await
.expect("failed to await UDP socket readiness");
Ok(())
}
}

Expand Down Expand Up @@ -545,7 +546,7 @@ impl<T: WasiView> udp::HostOutgoingDatagramStream for T {

#[async_trait]
impl Subscribe for OutgoingDatagramStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
match self.send_state {
SendState::Idle | SendState::Permitted(_) => {}
SendState::Waiting => {
Expand All @@ -557,5 +558,6 @@ impl Subscribe for OutgoingDatagramStream {
self.send_state = SendState::Idle;
}
}
Ok(())
}
}
3 changes: 2 additions & 1 deletion crates/wasi/src/preview2/ip_name_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ impl<T: WasiView> HostResolveAddressStream for T {

#[async_trait::async_trait]
impl Subscribe for ResolveAddressStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> Result<()> {
if let ResolveAddressStream::Waiting(future) = self {
*self = ResolveAddressStream::Done(future.await.map(|v| v.into_iter()));
}
Ok(())
}
}

Expand Down
65 changes: 48 additions & 17 deletions crates/wasi/src/preview2/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ impl HostInputStream for MemoryInputPipe {

#[async_trait::async_trait]
impl Subscribe for MemoryInputPipe {
async fn ready(&mut self) {}
async fn ready(&mut self) -> wasmtime::Result<()> {
Ok(())
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -104,7 +106,9 @@ impl HostOutputStream for MemoryOutputPipe {

#[async_trait::async_trait]
impl Subscribe for MemoryOutputPipe {
async fn ready(&mut self) {}
async fn ready(&mut self) -> wasmtime::Result<()> {
Ok(())
}
}

/// Provides a [`HostInputStream`] impl from a [`tokio::io::AsyncRead`] impl
Expand Down Expand Up @@ -193,16 +197,17 @@ impl HostInputStream for AsyncReadStream {
}
#[async_trait::async_trait]
impl Subscribe for AsyncReadStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
if self.buffer.is_some() || self.closed {
return;
return Ok(());
}
match self.receiver.recv().await {
Some(res) => self.buffer = Some(res),
None => {
panic!("no more sender for an open AsyncReadStream - should be impossible")
}
}
Ok(())
}
}

Expand All @@ -227,7 +232,9 @@ impl HostOutputStream for SinkOutputStream {

#[async_trait::async_trait]
impl Subscribe for SinkOutputStream {
async fn ready(&mut self) {}
async fn ready(&mut self) -> wasmtime::Result<()> {
Ok(())
}
}

/// A stream that is ready immediately, but will always report that it's closed.
Expand All @@ -243,7 +250,9 @@ impl HostInputStream for ClosedInputStream {

#[async_trait::async_trait]
impl Subscribe for ClosedInputStream {
async fn ready(&mut self) {}
async fn ready(&mut self) -> wasmtime::Result<()> {
Ok(())
}
}

/// An output stream that is always closed.
Expand All @@ -265,7 +274,9 @@ impl HostOutputStream for ClosedOutputStream {

#[async_trait::async_trait]
impl Subscribe for ClosedOutputStream {
async fn ready(&mut self) {}
async fn ready(&mut self) -> wasmtime::Result<()> {
Ok(())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -323,7 +334,9 @@ mod test {
// The reader task hasn't run yet. Call `ready` to await and fill the buffer.
Ok(bs) => {
assert!(bs.is_empty());
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");
assert!(matches!(reader.read(0), Err(StreamError::Closed)));
}
res => panic!("unexpected: {res:?}"),
Expand All @@ -337,7 +350,9 @@ mod test {
let bs = reader.read(10).unwrap();
if bs.is_empty() {
// Reader task hasn't run yet. Call `ready` to await and fill the buffer.
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");
// Now a read should succeed
let bs = reader.read(10).unwrap();
assert_eq!(bs.len(), 10);
Expand Down Expand Up @@ -367,7 +382,9 @@ mod test {
let bs = reader.read(123).unwrap();
if bs.is_empty() {
// Reader task hasn't run yet. Call `ready` to await and fill the buffer.
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");
// Now a read should succeed
let bs = reader.read(123).unwrap();
assert_eq!(bs.len(), 123);
Expand All @@ -382,7 +399,9 @@ mod test {
Ok(bs) => {
assert!(bs.is_empty());
// Need to await to give this side time to catch up
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");
// Now a read should show closed
assert!(matches!(reader.read(0), Err(StreamError::Closed)));
}
Expand All @@ -402,7 +421,9 @@ mod test {
let bs = reader.read(1).unwrap();
if bs.is_empty() {
// Reader task hasn't run yet. Call `ready` to await and fill the buffer.
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");
// Now a read should succeed
let bs = reader.read(1).unwrap();
assert_eq!(*bs, [123u8]);
Expand All @@ -426,7 +447,9 @@ mod test {

// Wait readiness (yes we could possibly win the race and read it out faster, leaving that
// out of the test for simplicity)
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");

// read the something else back out:
let bs = reader.read(1).unwrap();
Expand All @@ -448,7 +471,9 @@ mod test {

// Wait readiness (yes we could possibly win the race and read it out faster, leaving that
// out of the test for simplicity)
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");

// empty and now closed:
assert!(matches!(reader.read(1), Err(StreamError::Closed)));
Expand All @@ -468,15 +493,19 @@ mod test {
w
});

resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");

// Now we expect the reader task has sent 4k from the stream to the reader.
// Try to read out one bigger than the buffer available:
let bs = reader.read(4097).unwrap();
assert_eq!(bs.len(), 4096);

// Allow the crank to turn more:
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");

// Again we expect the reader task has sent 4k from the stream to the reader.
// Try to read out one bigger than the buffer available:
Expand All @@ -490,7 +519,9 @@ mod test {
drop(w);

// Allow the crank to turn more:
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");

// Now we expect the reader to be empty, and the stream closed:
assert!(matches!(reader.read(4097), Err(StreamError::Closed)));
Expand Down

0 comments on commit fd63bc5

Please sign in to comment.