Skip to content

Commit

Permalink
feat: support for target wasm32-wasi
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Zak <richard@profian.com>
Co-authored-by: Harald Hoyer <harald@profian.com>
  • Loading branch information
rjzak and haraldh committed Jun 6, 2022
1 parent 7011a68 commit a55272a
Show file tree
Hide file tree
Showing 13 changed files with 126 additions and 19 deletions.
2 changes: 1 addition & 1 deletion .cargo/config
@@ -1,2 +1,2 @@
# [build]
# rustflags = ["--cfg", "tokio_unstable"]
# rustflags = ["--cfg", "tokio_unstable"]
1 change: 1 addition & 0 deletions tokio-util/src/lib.rs
Expand Up @@ -29,6 +29,7 @@ cfg_codec! {
}

cfg_net! {
#[cfg(not(target_arch = "wasm32"))]
pub mod udp;
pub mod net;
}
Expand Down
7 changes: 7 additions & 0 deletions tokio-util/src/net/mod.rs
Expand Up @@ -39,9 +39,16 @@ impl Listener for tokio::net::TcpListener {
Self::poll_accept(self, cx)
}

#[cfg(not(target_os = "wasi"))]
fn local_addr(&self) -> Result<Self::Addr> {
self.local_addr().map(Into::into)
}

#[cfg(target_os = "wasi")]
fn local_addr(&self) -> Result<Self::Addr> {
use std::io;
Err(io::ErrorKind::Unsupported.into())
}
}

/// Future for accepting a new connection from a listener.
Expand Down
8 changes: 5 additions & 3 deletions tokio/Cargo.toml
Expand Up @@ -51,7 +51,6 @@ net = [
"mio/os-poll",
"mio/os-ext",
"mio/net",
"socket2",
"winapi/namedpipeapi",
]
process = [
Expand Down Expand Up @@ -97,11 +96,14 @@ pin-project-lite = "0.2.0"
bytes = { version = "1.0.0", optional = true }
once_cell = { version = "1.5.2", optional = true }
memchr = { version = "2.2", optional = true }
mio = { version = "0.8.1", optional = true }
socket2 = { version = "0.4.4", optional = true, features = [ "all" ] }
#mio = { version = "0.8.1", optional = true }
mio = { git = "https://github.com/tokio-rs/mio/", branch = "master", optional = true }
num_cpus = { version = "1.8.0", optional = true }
parking_lot = { version = "0.12.0", optional = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
socket2 = { version = "0.4.4", features = [ "all" ] }

# Currently unstable. The API exposed by these features may be broken at any time.
# Requires `--cfg tokio_unstable` to enable.
[target.'cfg(tokio_unstable)'.dependencies]
Expand Down
10 changes: 10 additions & 0 deletions tokio/src/io/driver/mod.rs
Expand Up @@ -72,6 +72,7 @@ pub(super) struct Inner {
io_dispatch: RwLock<IoDispatcher>,

/// Used to wake up the reactor from a call to `turn`.
#[cfg(not(target_os = "wasi"))]
waker: mio::Waker,

metrics: IoDriverMetrics,
Expand Down Expand Up @@ -115,8 +116,15 @@ impl Driver {
/// creation.
pub(crate) fn new() -> io::Result<Driver> {
let poll = mio::Poll::new()?;
#[cfg(not(target_os = "wasi"))]
let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;

//#[cfg(not(target_os = "wasi"))]
let registry = poll.registry().try_clone()?;
//#[cfg(target_os = "wasi")]
//let registry = poll.registry();

// let registry = poll.registry().clone();

let slab = Slab::new();
let allocator = slab.allocator();
Expand All @@ -129,6 +137,7 @@ impl Driver {
inner: Arc::new(Inner {
registry,
io_dispatch: RwLock::new(IoDispatcher::new(allocator)),
#[cfg(not(target_os = "wasi"))]
waker,
metrics: IoDriverMetrics::default(),
}),
Expand Down Expand Up @@ -299,6 +308,7 @@ impl Handle {
/// blocked in `turn`, then the next call to `turn` will not block and
/// return immediately.
fn wakeup(&self) {
#[cfg(not(target_os = "wasi"))]
self.inner.waker.wake().expect("failed to wake I/O driver");
}
}
Expand Down
1 change: 0 additions & 1 deletion tokio/src/io/driver/registration.rs
Expand Up @@ -238,7 +238,6 @@ cfg_io_readiness! {
pub(crate) async fn async_io<R>(&self, interest: Interest, mut f: impl FnMut() -> io::Result<R>) -> io::Result<R> {
loop {
let event = self.readiness(interest).await?;

match f() {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.clear_readiness(event);
Expand Down
1 change: 1 addition & 0 deletions tokio/src/io/poll_evented.rs
Expand Up @@ -126,6 +126,7 @@ impl<E: Source> PollEvented<E> {

/// Deregisters the inner io from the registration and returns a Result containing the inner io.
#[cfg(any(feature = "net", feature = "process"))]
#[cfg(not(target_os = "wasi"))]
pub(crate) fn into_inner(mut self) -> io::Result<E> {
let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here.
self.registration.deregister(&mut inner)?;
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/lib.rs
Expand Up @@ -410,7 +410,9 @@ mod future;
pub mod io;
pub mod net;

#[cfg(not(target_arch = "wasm32-wasi"))]
mod loom;
#[cfg(not(target_arch = "wasm32-wasi"))]
mod park;

cfg_process! {
Expand Down
24 changes: 20 additions & 4 deletions tokio/src/macros/cfg.rs
Expand Up @@ -61,6 +61,7 @@ macro_rules! cfg_fs {
($($item:item)*) => {
$(
#[cfg(feature = "fs")]
#[cfg(not(target_arch = "wasm32"))]
#[cfg_attr(docsrs, doc(cfg(feature = "fs")))]
$item
)*
Expand All @@ -86,6 +87,7 @@ macro_rules! cfg_io_driver {
feature = "process",
all(unix, feature = "signal"),
))))]
//#[cfg(not(target_arch = "wasm32"))]
$item
)*
}
Expand All @@ -99,6 +101,7 @@ macro_rules! cfg_io_driver_impl {
feature = "process",
all(unix, feature = "signal"),
))]
//#[cfg(not(target_arch = "wasm32"))]
$item
)*
}
Expand All @@ -112,6 +115,7 @@ macro_rules! cfg_not_io_driver {
feature = "process",
all(unix, feature = "signal"),
)))]
//#[cfg(not(target_arch = "wasm32"))]
$item
)*
}
Expand Down Expand Up @@ -154,13 +158,21 @@ macro_rules! cfg_not_io_util {

macro_rules! cfg_loom {
($($item:item)*) => {
$( #[cfg(loom)] $item )*
$(
#[cfg(loom)]
#[cfg(not(target_os = "wasi"))]
$item
)*
}
}

macro_rules! cfg_not_loom {
($($item:item)*) => {
$( #[cfg(not(loom))] $item )*
$(
#[cfg(not(loom))]
#[cfg(target_os = "wasi")]
$item
)*
}
}

Expand Down Expand Up @@ -247,6 +259,7 @@ macro_rules! cfg_process {
#[cfg(feature = "process")]
#[cfg_attr(docsrs, doc(cfg(feature = "process")))]
#[cfg(not(loom))]
#[cfg(not(target_arch = "wasm32"))]
$item
)*
}
Expand Down Expand Up @@ -275,6 +288,7 @@ macro_rules! cfg_signal {
#[cfg(feature = "signal")]
#[cfg_attr(docsrs, doc(cfg(feature = "signal")))]
#[cfg(not(loom))]
#[cfg(not(target_arch = "wasm32"))]
$item
)*
}
Expand Down Expand Up @@ -451,7 +465,8 @@ macro_rules! cfg_has_atomic_u64 {
target_arch = "arm",
target_arch = "mips",
target_arch = "powerpc",
target_arch = "riscv32"
target_arch = "riscv32",
target_arch = "wasm32"
)))]
$item
)*
Expand All @@ -465,7 +480,8 @@ macro_rules! cfg_not_has_atomic_u64 {
target_arch = "arm",
target_arch = "mips",
target_arch = "powerpc",
target_arch = "riscv32"
target_arch = "riscv32",
target_arch = "wasm32"
))]
$item
)*
Expand Down
21 changes: 12 additions & 9 deletions tokio/src/net/mod.rs
Expand Up @@ -23,7 +23,7 @@
//! [`UnixDatagram`]: UnixDatagram

mod addr;
#[cfg(feature = "net")]
#[cfg(all(feature = "net", not(target_os = "wasi")))]
pub(crate) use addr::to_socket_addrs;
pub use addr::ToSocketAddrs;

Expand All @@ -33,20 +33,23 @@ cfg_net! {

pub mod tcp;
pub use tcp::listener::TcpListener;
pub use tcp::socket::TcpSocket;
pub use tcp::stream::TcpStream;

mod udp;
pub use udp::UdpSocket;
}

cfg_net_unix! {
pub mod unix;
pub use unix::datagram::socket::UnixDatagram;
pub use unix::listener::UnixListener;
pub use unix::stream::UnixStream;
pub mod unix;
pub use unix::datagram::socket::UnixDatagram;
pub use unix::listener::UnixListener;
pub use unix::stream::UnixStream;
pub use tcp::socket::TcpSocket;
mod udp;
pub use udp::UdpSocket;
}

cfg_net_windows! {
pub mod windows;
pub mod windows;
pub use tcp::socket::TcpSocket;
mod udp;
pub use udp::UdpSocket;
}
43 changes: 42 additions & 1 deletion tokio/src/net/tcp/listener.rs
@@ -1,10 +1,13 @@
use crate::io::{Interest, PollEvented};
use crate::net::tcp::TcpStream;

#[cfg(not(target_os = "wasi"))]
use crate::net::{to_socket_addrs, ToSocketAddrs};

use std::convert::TryFrom;
use std::fmt;
use std::io;
//use std::io::Error;
use std::net::{self, SocketAddr};
use std::task::{Context, Poll};

Expand Down Expand Up @@ -94,6 +97,7 @@ impl TcpListener {
/// Ok(())
/// }
/// ```
#[cfg(not(target_os = "wasi"))]
pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
let addrs = to_socket_addrs(addr).await?;

Expand All @@ -114,6 +118,7 @@ impl TcpListener {
}))
}

#[cfg(not(target_os = "wasi"))]
fn bind_addr(addr: SocketAddr) -> io::Result<TcpListener> {
let listener = mio::net::TcpListener::bind(addr)?;
TcpListener::new(listener)
Expand Down Expand Up @@ -154,13 +159,34 @@ impl TcpListener {
/// }
/// ```
pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
println!("Tokio TcpListener::accept() {}:{}", file!(), line!());
let (mio, addr) = self
.io
.registration()
.async_io(Interest::READABLE, || self.io.accept())
.await?;

println!("Tokio TcpListener::accept() self.io.registration().async_io().await {}:{}", file!(), line!());
let mut small_buffer = [0u8;200];
match mio.peek(&mut small_buffer) {
Ok(x) => {
let buffer_string = String::from_utf8_lossy(&small_buffer[..x]).to_string();
println!("Tokio TcpListener::accept() buffer {} size {} @ {}:{}", buffer_string, x, file!(), line!());
}
Err(e) => {
println!("Tokio TcpListener::accept() error {} trying to peek() at the buffer {}:{}", e, file!(), line!());
}
}
let stream = TcpStream::new(mio)?;
match stream.try_read(&mut small_buffer) {
Ok(x) => {
let buffer_string = String::from_utf8_lossy(&small_buffer[..x]).to_string();
println!("Tokio TcpListener::accept() buffer after TcpStream::new(): {} size {} @ {}:{}", buffer_string, x, file!(), line!());
}
Err(e) => {
println!("Tokio TcpListener::accept() error {} trying to TcpStream::try_read() at the buffer {}:{}", e, file!(), line!());
}
}
println!("Tokio TcpListener::accept() TcpStream::new() {}:{}", file!(), line!());
Ok((stream, addr))
}

Expand Down Expand Up @@ -249,6 +275,7 @@ impl TcpListener {
/// [`tokio::net::TcpListener`]: TcpListener
/// [`std::net::TcpListener`]: std::net::TcpListener
/// [`set_nonblocking`]: fn@std::net::TcpListener::set_nonblocking
#[cfg(not(target_os = "wasi"))]
pub fn into_std(self) -> io::Result<std::net::TcpListener> {
#[cfg(unix)]
{
Expand All @@ -269,6 +296,7 @@ impl TcpListener {
}
}

#[cfg(not(target_os = "wasi"))]
pub(crate) fn new(listener: mio::net::TcpListener) -> io::Result<TcpListener> {
let io = PollEvented::new(listener)?;
Ok(TcpListener { io })
Expand Down Expand Up @@ -297,6 +325,7 @@ impl TcpListener {
/// Ok(())
/// }
/// ```
#[cfg(not(target_os = "wasi"))]
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.io.local_addr()
}
Expand Down Expand Up @@ -384,6 +413,18 @@ mod sys {
}
}

#[cfg(target_os = "wasi")]
mod sys {
use super::TcpListener;
use std::os::wasi::prelude::*;

impl AsRawFd for TcpListener {
fn as_raw_fd(&self) -> RawFd {
self.io.as_raw_fd()
}
}
}

#[cfg(windows)]
mod sys {
use super::TcpListener;
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/net/tcp/socket.rs
@@ -1,3 +1,5 @@
#![cfg(not(target_os = "wasi"))]

use crate::net::{TcpListener, TcpStream};

use std::fmt;
Expand Down

0 comments on commit a55272a

Please sign in to comment.