Skip to content

Commit

Permalink
feat: support for target wasm32-wasi
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Zak <richard@profian.com>
Co-authored-by: Harald Hoyer <harald@profian.com>
  • Loading branch information
rjzak and haraldh committed Jun 6, 2022
1 parent 340c4dc commit 95fcfa7
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 8 deletions.
1 change: 1 addition & 0 deletions tokio-util/src/lib.rs
Expand Up @@ -29,6 +29,7 @@ cfg_codec! {
}

cfg_net! {
#[cfg(not(target_arch = "wasm32"))]
pub mod udp;
pub mod net;
}
Expand Down
7 changes: 7 additions & 0 deletions tokio-util/src/net/mod.rs
Expand Up @@ -39,9 +39,16 @@ impl Listener for tokio::net::TcpListener {
Self::poll_accept(self, cx)
}

#[cfg(not(target_os = "wasi"))]
fn local_addr(&self) -> Result<Self::Addr> {
self.local_addr().map(Into::into)
}

#[cfg(target_os = "wasi")]
fn local_addr(&self) -> Result<Self::Addr> {
use std::io;
Err(io::ErrorKind::Unsupported.into())
}
}

/// Future for accepting a new connection from a listener.
Expand Down
8 changes: 5 additions & 3 deletions tokio/Cargo.toml
Expand Up @@ -51,7 +51,6 @@ net = [
"mio/os-poll",
"mio/os-ext",
"mio/net",
"socket2",
"winapi/namedpipeapi",
]
process = [
Expand Down Expand Up @@ -97,11 +96,14 @@ pin-project-lite = "0.2.0"
bytes = { version = "1.0.0", optional = true }
once_cell = { version = "1.5.2", optional = true }
memchr = { version = "2.2", optional = true }
mio = { version = "0.8.1", optional = true }
socket2 = { version = "0.4.4", optional = true, features = [ "all" ] }
#mio = { version = "0.8.1", optional = true }
mio = { git = "https://github.com/tokio-rs/mio/", branch = "master", optional = true }
num_cpus = { version = "1.8.0", optional = true }
parking_lot = { version = "0.12.0", optional = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
socket2 = { version = "0.4.4", features = [ "all" ] }

# Currently unstable. The API exposed by these features may be broken at any time.
# Requires `--cfg tokio_unstable` to enable.
[target.'cfg(tokio_unstable)'.dependencies]
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/io/driver/mod.rs
Expand Up @@ -72,6 +72,7 @@ pub(super) struct Inner {
io_dispatch: RwLock<IoDispatcher>,

/// Used to wake up the reactor from a call to `turn`.
#[cfg(not(target_os = "wasi"))]
waker: mio::Waker,

metrics: IoDriverMetrics,
Expand Down Expand Up @@ -115,6 +116,7 @@ impl Driver {
/// creation.
pub(crate) fn new() -> io::Result<Driver> {
let poll = mio::Poll::new()?;
#[cfg(not(target_os = "wasi"))]
let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
let registry = poll.registry().try_clone()?;

Expand All @@ -129,6 +131,7 @@ impl Driver {
inner: Arc::new(Inner {
registry,
io_dispatch: RwLock::new(IoDispatcher::new(allocator)),
#[cfg(not(target_os = "wasi"))]
waker,
metrics: IoDriverMetrics::default(),
}),
Expand Down Expand Up @@ -299,6 +302,7 @@ impl Handle {
/// blocked in `turn`, then the next call to `turn` will not block and
/// return immediately.
fn wakeup(&self) {
#[cfg(not(target_os = "wasi"))]
self.inner.waker.wake().expect("failed to wake I/O driver");
}
}
Expand Down
1 change: 1 addition & 0 deletions tokio/src/io/poll_evented.rs
Expand Up @@ -126,6 +126,7 @@ impl<E: Source> PollEvented<E> {

/// Deregisters the inner io from the registration and returns a Result containing the inner io.
#[cfg(any(feature = "net", feature = "process"))]
#[cfg(not(target_os = "wasi"))]
pub(crate) fn into_inner(mut self) -> io::Result<E> {
let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here.
self.registration.deregister(&mut inner)?;
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/lib.rs
Expand Up @@ -410,7 +410,9 @@ mod future;
pub mod io;
pub mod net;

#[cfg(not(target_arch = "wasm32-wasi"))]
mod loom;
#[cfg(not(target_arch = "wasm32-wasi"))]
mod park;

cfg_process! {
Expand Down
21 changes: 17 additions & 4 deletions tokio/src/macros/cfg.rs
Expand Up @@ -61,6 +61,7 @@ macro_rules! cfg_fs {
($($item:item)*) => {
$(
#[cfg(feature = "fs")]
#[cfg(not(target_arch = "wasm32"))]
#[cfg_attr(docsrs, doc(cfg(feature = "fs")))]
$item
)*
Expand Down Expand Up @@ -154,13 +155,21 @@ macro_rules! cfg_not_io_util {

macro_rules! cfg_loom {
($($item:item)*) => {
$( #[cfg(loom)] $item )*
$(
#[cfg(loom)]
#[cfg(not(target_os = "wasi"))]
$item
)*
}
}

macro_rules! cfg_not_loom {
($($item:item)*) => {
$( #[cfg(not(loom))] $item )*
$(
#[cfg(any(not(loom), target_os = "wasi"))]
//#[cfg(target_os = "wasi")]
$item
)*
}
}

Expand Down Expand Up @@ -247,6 +256,7 @@ macro_rules! cfg_process {
#[cfg(feature = "process")]
#[cfg_attr(docsrs, doc(cfg(feature = "process")))]
#[cfg(not(loom))]
#[cfg(not(target_arch = "wasm32"))]
$item
)*
}
Expand Down Expand Up @@ -275,6 +285,7 @@ macro_rules! cfg_signal {
#[cfg(feature = "signal")]
#[cfg_attr(docsrs, doc(cfg(feature = "signal")))]
#[cfg(not(loom))]
#[cfg(not(target_arch = "wasm32"))]
$item
)*
}
Expand Down Expand Up @@ -451,7 +462,8 @@ macro_rules! cfg_has_atomic_u64 {
target_arch = "arm",
target_arch = "mips",
target_arch = "powerpc",
target_arch = "riscv32"
target_arch = "riscv32",
target_arch = "wasm32"
)))]
$item
)*
Expand All @@ -465,7 +477,8 @@ macro_rules! cfg_not_has_atomic_u64 {
target_arch = "arm",
target_arch = "mips",
target_arch = "powerpc",
target_arch = "riscv32"
target_arch = "riscv32",
target_arch = "wasm32"
))]
$item
)*
Expand Down
5 changes: 4 additions & 1 deletion tokio/src/net/mod.rs
Expand Up @@ -23,7 +23,7 @@
//! [`UnixDatagram`]: UnixDatagram

mod addr;
#[cfg(feature = "net")]
#[cfg(all(feature = "net", not(target_os = "wasi")))]
pub(crate) use addr::to_socket_addrs;
pub use addr::ToSocketAddrs;

Expand All @@ -33,10 +33,13 @@ cfg_net! {

pub mod tcp;
pub use tcp::listener::TcpListener;
#[cfg(not(target_os = "wasi"))]
pub use tcp::socket::TcpSocket;
pub use tcp::stream::TcpStream;

#[cfg(not(target_os = "wasi"))]
mod udp;
#[cfg(not(target_os = "wasi"))]
pub use udp::UdpSocket;
}

Expand Down
19 changes: 19 additions & 0 deletions tokio/src/net/tcp/listener.rs
@@ -1,5 +1,7 @@
use crate::io::{Interest, PollEvented};
use crate::net::tcp::TcpStream;

#[cfg(not(target_os = "wasi"))]
use crate::net::{to_socket_addrs, ToSocketAddrs};

use std::convert::TryFrom;
Expand Down Expand Up @@ -94,6 +96,7 @@ impl TcpListener {
/// Ok(())
/// }
/// ```
#[cfg(not(target_os = "wasi"))]
pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
let addrs = to_socket_addrs(addr).await?;

Expand All @@ -114,6 +117,7 @@ impl TcpListener {
}))
}

#[cfg(not(target_os = "wasi"))]
fn bind_addr(addr: SocketAddr) -> io::Result<TcpListener> {
let listener = mio::net::TcpListener::bind(addr)?;
TcpListener::new(listener)
Expand Down Expand Up @@ -249,6 +253,7 @@ impl TcpListener {
/// [`tokio::net::TcpListener`]: TcpListener
/// [`std::net::TcpListener`]: std::net::TcpListener
/// [`set_nonblocking`]: fn@std::net::TcpListener::set_nonblocking
#[cfg(not(target_os = "wasi"))]
pub fn into_std(self) -> io::Result<std::net::TcpListener> {
#[cfg(unix)]
{
Expand All @@ -269,6 +274,7 @@ impl TcpListener {
}
}

#[cfg(not(target_os = "wasi"))]
pub(crate) fn new(listener: mio::net::TcpListener) -> io::Result<TcpListener> {
let io = PollEvented::new(listener)?;
Ok(TcpListener { io })
Expand Down Expand Up @@ -297,6 +303,7 @@ impl TcpListener {
/// Ok(())
/// }
/// ```
#[cfg(not(target_os = "wasi"))]
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.io.local_addr()
}
Expand Down Expand Up @@ -384,6 +391,18 @@ mod sys {
}
}

#[cfg(target_os = "wasi")]
mod sys {
use super::TcpListener;
use std::os::wasi::prelude::*;

impl AsRawFd for TcpListener {
fn as_raw_fd(&self) -> RawFd {
self.io.as_raw_fd()
}
}
}

#[cfg(windows)]
mod sys {
use super::TcpListener;
Expand Down
1 change: 1 addition & 0 deletions tokio/src/net/tcp/socket.rs
@@ -1,3 +1,4 @@
#![cfg(not(target_os = "wasi"))]
use crate::net::{TcpListener, TcpStream};

use std::fmt;
Expand Down
23 changes: 23 additions & 0 deletions tokio/src/net/tcp/stream.rs
@@ -1,7 +1,11 @@
#[cfg(not(target_os = "wasi"))]
use crate::future::poll_fn;

use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};

#[cfg(not(target_os = "wasi"))]
use crate::net::{to_socket_addrs, ToSocketAddrs};

use std::convert::TryFrom;
Expand Down Expand Up @@ -108,6 +112,7 @@ impl TcpStream {
///
/// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
#[cfg(not(target_os = "wasi"))]
pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
let addrs = to_socket_addrs(addr).await?;

Expand All @@ -129,11 +134,13 @@ impl TcpStream {
}

/// Establishes a connection to the specified `addr`.
#[cfg(not(target_os = "wasi"))]
async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
let sys = mio::net::TcpStream::connect(addr)?;
TcpStream::connect_mio(sys).await
}

#[cfg(not(target_os = "wasi"))]
pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream> {
let stream = TcpStream::new(sys)?;

Expand Down Expand Up @@ -226,6 +233,7 @@ impl TcpStream {
/// [`tokio::net::TcpStream`]: TcpStream
/// [`std::net::TcpStream`]: std::net::TcpStream
/// [`set_nonblocking`]: fn@std::net::TcpStream::set_nonblocking
#[cfg(not(target_os = "wasi"))]
pub fn into_std(self) -> io::Result<std::net::TcpStream> {
#[cfg(unix)]
{
Expand Down Expand Up @@ -1096,10 +1104,17 @@ impl TcpStream {
/// # Ok(())
/// # }
/// ```
#[cfg(not(target_os = "wasi"))]
pub fn linger(&self) -> io::Result<Option<Duration>> {
socket2::SockRef::from(self).linger()
}

/// Returns `io::ErrorKind::Unsupported` as this is not supported on WebAssembly
#[cfg(target_os = "wasi")]
pub fn linger(&self) -> io::Result<Option<Duration>> {
Err(io::ErrorKind::Unsupported.into())
}

/// Sets the linger duration of this socket by setting the SO_LINGER option.
///
/// This option controls the action taken when a stream has unsent messages and the stream is
Expand All @@ -1121,10 +1136,18 @@ impl TcpStream {
/// # Ok(())
/// # }
/// ```
#[cfg(not(target_os = "wasi"))]
pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
socket2::SockRef::from(self).set_linger(dur)
}

/// Returns `io::ErrorKind::Unsupported` as this is not supported on WebAssembly
#[cfg(target_os = "wasi")]
#[allow(unused_variables)]
pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
Err(io::ErrorKind::Unsupported.into())
}

/// Gets the value of the `IP_TTL` option for this socket.
///
/// For more information about this option, see [`set_ttl`].
Expand Down

0 comments on commit 95fcfa7

Please sign in to comment.