From a55272a13db90b31af0a4efd4c1f2ab60c05a939 Mon Sep 17 00:00:00 2001 From: Richard Zak Date: Tue, 17 May 2022 21:51:15 -0400 Subject: [PATCH] feat: support for target `wasm32-wasi` Signed-off-by: Richard Zak Co-authored-by: Harald Hoyer --- .cargo/config | 2 +- tokio-util/src/lib.rs | 1 + tokio-util/src/net/mod.rs | 7 +++++ tokio/Cargo.toml | 8 ++++-- tokio/src/io/driver/mod.rs | 10 +++++++ tokio/src/io/driver/registration.rs | 1 - tokio/src/io/poll_evented.rs | 1 + tokio/src/lib.rs | 2 ++ tokio/src/macros/cfg.rs | 24 +++++++++++++--- tokio/src/net/mod.rs | 21 ++++++++------ tokio/src/net/tcp/listener.rs | 43 ++++++++++++++++++++++++++++- tokio/src/net/tcp/socket.rs | 2 ++ tokio/src/net/tcp/stream.rs | 23 +++++++++++++++ 13 files changed, 126 insertions(+), 19 deletions(-) diff --git a/.cargo/config b/.cargo/config index df8858986f3..6b807fda6bc 100644 --- a/.cargo/config +++ b/.cargo/config @@ -1,2 +1,2 @@ # [build] -# rustflags = ["--cfg", "tokio_unstable"] \ No newline at end of file +# rustflags = ["--cfg", "tokio_unstable"] diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index e4876a58a80..524fc4705dd 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -29,6 +29,7 @@ cfg_codec! { } cfg_net! { + #[cfg(not(target_arch = "wasm32"))] pub mod udp; pub mod net; } diff --git a/tokio-util/src/net/mod.rs b/tokio-util/src/net/mod.rs index 4817e10d0f3..04c1a51b219 100644 --- a/tokio-util/src/net/mod.rs +++ b/tokio-util/src/net/mod.rs @@ -39,9 +39,16 @@ impl Listener for tokio::net::TcpListener { Self::poll_accept(self, cx) } + #[cfg(not(target_os = "wasi"))] fn local_addr(&self) -> Result { self.local_addr().map(Into::into) } + + #[cfg(target_os = "wasi")] + fn local_addr(&self) -> Result { + use std::io; + Err(io::ErrorKind::Unsupported.into()) + } } /// Future for accepting a new connection from a listener. diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 7a297835045..2a9a594522f 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -51,7 +51,6 @@ net = [ "mio/os-poll", "mio/os-ext", "mio/net", - "socket2", "winapi/namedpipeapi", ] process = [ @@ -97,11 +96,14 @@ pin-project-lite = "0.2.0" bytes = { version = "1.0.0", optional = true } once_cell = { version = "1.5.2", optional = true } memchr = { version = "2.2", optional = true } -mio = { version = "0.8.1", optional = true } -socket2 = { version = "0.4.4", optional = true, features = [ "all" ] } +#mio = { version = "0.8.1", optional = true } +mio = { git = "https://github.com/tokio-rs/mio/", branch = "master", optional = true } num_cpus = { version = "1.8.0", optional = true } parking_lot = { version = "0.12.0", optional = true } +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +socket2 = { version = "0.4.4", features = [ "all" ] } + # Currently unstable. The API exposed by these features may be broken at any time. # Requires `--cfg tokio_unstable` to enable. [target.'cfg(tokio_unstable)'.dependencies] diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index 66bc3182206..f40bcd8d72c 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -72,6 +72,7 @@ pub(super) struct Inner { io_dispatch: RwLock, /// Used to wake up the reactor from a call to `turn`. + #[cfg(not(target_os = "wasi"))] waker: mio::Waker, metrics: IoDriverMetrics, @@ -115,8 +116,15 @@ impl Driver { /// creation. pub(crate) fn new() -> io::Result { let poll = mio::Poll::new()?; + #[cfg(not(target_os = "wasi"))] let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?; + + //#[cfg(not(target_os = "wasi"))] let registry = poll.registry().try_clone()?; + //#[cfg(target_os = "wasi")] + //let registry = poll.registry(); + + // let registry = poll.registry().clone(); let slab = Slab::new(); let allocator = slab.allocator(); @@ -129,6 +137,7 @@ impl Driver { inner: Arc::new(Inner { registry, io_dispatch: RwLock::new(IoDispatcher::new(allocator)), + #[cfg(not(target_os = "wasi"))] waker, metrics: IoDriverMetrics::default(), }), @@ -299,6 +308,7 @@ impl Handle { /// blocked in `turn`, then the next call to `turn` will not block and /// return immediately. fn wakeup(&self) { + #[cfg(not(target_os = "wasi"))] self.inner.waker.wake().expect("failed to wake I/O driver"); } } diff --git a/tokio/src/io/driver/registration.rs b/tokio/src/io/driver/registration.rs index c9393650c20..436701197fd 100644 --- a/tokio/src/io/driver/registration.rs +++ b/tokio/src/io/driver/registration.rs @@ -238,7 +238,6 @@ cfg_io_readiness! { pub(crate) async fn async_io(&self, interest: Interest, mut f: impl FnMut() -> io::Result) -> io::Result { loop { let event = self.readiness(interest).await?; - match f() { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { self.clear_readiness(event); diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index ce4c1426acc..83150ab76bb 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -126,6 +126,7 @@ impl PollEvented { /// Deregisters the inner io from the registration and returns a Result containing the inner io. #[cfg(any(feature = "net", feature = "process"))] + #[cfg(not(target_os = "wasi"))] pub(crate) fn into_inner(mut self) -> io::Result { let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here. self.registration.deregister(&mut inner)?; diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index c0d7e6252e4..d4f8ecd5e0c 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -410,7 +410,9 @@ mod future; pub mod io; pub mod net; +#[cfg(not(target_arch = "wasm32-wasi"))] mod loom; +#[cfg(not(target_arch = "wasm32-wasi"))] mod park; cfg_process! { diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 45ae5f9913a..3e98e876316 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -61,6 +61,7 @@ macro_rules! cfg_fs { ($($item:item)*) => { $( #[cfg(feature = "fs")] + #[cfg(not(target_arch = "wasm32"))] #[cfg_attr(docsrs, doc(cfg(feature = "fs")))] $item )* @@ -86,6 +87,7 @@ macro_rules! cfg_io_driver { feature = "process", all(unix, feature = "signal"), ))))] + //#[cfg(not(target_arch = "wasm32"))] $item )* } @@ -99,6 +101,7 @@ macro_rules! cfg_io_driver_impl { feature = "process", all(unix, feature = "signal"), ))] + //#[cfg(not(target_arch = "wasm32"))] $item )* } @@ -112,6 +115,7 @@ macro_rules! cfg_not_io_driver { feature = "process", all(unix, feature = "signal"), )))] + //#[cfg(not(target_arch = "wasm32"))] $item )* } @@ -154,13 +158,21 @@ macro_rules! cfg_not_io_util { macro_rules! cfg_loom { ($($item:item)*) => { - $( #[cfg(loom)] $item )* + $( + #[cfg(loom)] + #[cfg(not(target_os = "wasi"))] + $item + )* } } macro_rules! cfg_not_loom { ($($item:item)*) => { - $( #[cfg(not(loom))] $item )* + $( + #[cfg(not(loom))] + #[cfg(target_os = "wasi")] + $item + )* } } @@ -247,6 +259,7 @@ macro_rules! cfg_process { #[cfg(feature = "process")] #[cfg_attr(docsrs, doc(cfg(feature = "process")))] #[cfg(not(loom))] + #[cfg(not(target_arch = "wasm32"))] $item )* } @@ -275,6 +288,7 @@ macro_rules! cfg_signal { #[cfg(feature = "signal")] #[cfg_attr(docsrs, doc(cfg(feature = "signal")))] #[cfg(not(loom))] + #[cfg(not(target_arch = "wasm32"))] $item )* } @@ -451,7 +465,8 @@ macro_rules! cfg_has_atomic_u64 { target_arch = "arm", target_arch = "mips", target_arch = "powerpc", - target_arch = "riscv32" + target_arch = "riscv32", + target_arch = "wasm32" )))] $item )* @@ -465,7 +480,8 @@ macro_rules! cfg_not_has_atomic_u64 { target_arch = "arm", target_arch = "mips", target_arch = "powerpc", - target_arch = "riscv32" + target_arch = "riscv32", + target_arch = "wasm32" ))] $item )* diff --git a/tokio/src/net/mod.rs b/tokio/src/net/mod.rs index 0b8c1ecd194..512a8d885a4 100644 --- a/tokio/src/net/mod.rs +++ b/tokio/src/net/mod.rs @@ -23,7 +23,7 @@ //! [`UnixDatagram`]: UnixDatagram mod addr; -#[cfg(feature = "net")] +#[cfg(all(feature = "net", not(target_os = "wasi")))] pub(crate) use addr::to_socket_addrs; pub use addr::ToSocketAddrs; @@ -33,20 +33,23 @@ cfg_net! { pub mod tcp; pub use tcp::listener::TcpListener; - pub use tcp::socket::TcpSocket; pub use tcp::stream::TcpStream; - mod udp; - pub use udp::UdpSocket; } cfg_net_unix! { - pub mod unix; - pub use unix::datagram::socket::UnixDatagram; - pub use unix::listener::UnixListener; - pub use unix::stream::UnixStream; + pub mod unix; + pub use unix::datagram::socket::UnixDatagram; + pub use unix::listener::UnixListener; + pub use unix::stream::UnixStream; + pub use tcp::socket::TcpSocket; + mod udp; + pub use udp::UdpSocket; } cfg_net_windows! { - pub mod windows; + pub mod windows; + pub use tcp::socket::TcpSocket; + mod udp; + pub use udp::UdpSocket; } diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs index 8aecb21aaa9..c089f3a6ec5 100644 --- a/tokio/src/net/tcp/listener.rs +++ b/tokio/src/net/tcp/listener.rs @@ -1,10 +1,13 @@ use crate::io::{Interest, PollEvented}; use crate::net::tcp::TcpStream; + +#[cfg(not(target_os = "wasi"))] use crate::net::{to_socket_addrs, ToSocketAddrs}; use std::convert::TryFrom; use std::fmt; use std::io; +//use std::io::Error; use std::net::{self, SocketAddr}; use std::task::{Context, Poll}; @@ -94,6 +97,7 @@ impl TcpListener { /// Ok(()) /// } /// ``` + #[cfg(not(target_os = "wasi"))] pub async fn bind(addr: A) -> io::Result { let addrs = to_socket_addrs(addr).await?; @@ -114,6 +118,7 @@ impl TcpListener { })) } + #[cfg(not(target_os = "wasi"))] fn bind_addr(addr: SocketAddr) -> io::Result { let listener = mio::net::TcpListener::bind(addr)?; TcpListener::new(listener) @@ -154,13 +159,34 @@ impl TcpListener { /// } /// ``` pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { + println!("Tokio TcpListener::accept() {}:{}", file!(), line!()); let (mio, addr) = self .io .registration() .async_io(Interest::READABLE, || self.io.accept()) .await?; - + println!("Tokio TcpListener::accept() self.io.registration().async_io().await {}:{}", file!(), line!()); + let mut small_buffer = [0u8;200]; + match mio.peek(&mut small_buffer) { + Ok(x) => { + let buffer_string = String::from_utf8_lossy(&small_buffer[..x]).to_string(); + println!("Tokio TcpListener::accept() buffer {} size {} @ {}:{}", buffer_string, x, file!(), line!()); + } + Err(e) => { + println!("Tokio TcpListener::accept() error {} trying to peek() at the buffer {}:{}", e, file!(), line!()); + } + } let stream = TcpStream::new(mio)?; + match stream.try_read(&mut small_buffer) { + Ok(x) => { + let buffer_string = String::from_utf8_lossy(&small_buffer[..x]).to_string(); + println!("Tokio TcpListener::accept() buffer after TcpStream::new(): {} size {} @ {}:{}", buffer_string, x, file!(), line!()); + } + Err(e) => { + println!("Tokio TcpListener::accept() error {} trying to TcpStream::try_read() at the buffer {}:{}", e, file!(), line!()); + } + } + println!("Tokio TcpListener::accept() TcpStream::new() {}:{}", file!(), line!()); Ok((stream, addr)) } @@ -249,6 +275,7 @@ impl TcpListener { /// [`tokio::net::TcpListener`]: TcpListener /// [`std::net::TcpListener`]: std::net::TcpListener /// [`set_nonblocking`]: fn@std::net::TcpListener::set_nonblocking + #[cfg(not(target_os = "wasi"))] pub fn into_std(self) -> io::Result { #[cfg(unix)] { @@ -269,6 +296,7 @@ impl TcpListener { } } + #[cfg(not(target_os = "wasi"))] pub(crate) fn new(listener: mio::net::TcpListener) -> io::Result { let io = PollEvented::new(listener)?; Ok(TcpListener { io }) @@ -297,6 +325,7 @@ impl TcpListener { /// Ok(()) /// } /// ``` + #[cfg(not(target_os = "wasi"))] pub fn local_addr(&self) -> io::Result { self.io.local_addr() } @@ -384,6 +413,18 @@ mod sys { } } +#[cfg(target_os = "wasi")] +mod sys { + use super::TcpListener; + use std::os::wasi::prelude::*; + + impl AsRawFd for TcpListener { + fn as_raw_fd(&self) -> RawFd { + self.io.as_raw_fd() + } + } +} + #[cfg(windows)] mod sys { use super::TcpListener; diff --git a/tokio/src/net/tcp/socket.rs b/tokio/src/net/tcp/socket.rs index bc93e83020f..27dcf33cb3d 100644 --- a/tokio/src/net/tcp/socket.rs +++ b/tokio/src/net/tcp/socket.rs @@ -1,3 +1,5 @@ +#![cfg(not(target_os = "wasi"))] + use crate::net::{TcpListener, TcpStream}; use std::fmt; diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 204d9ca256c..6d81f5dd5da 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -1,7 +1,11 @@ +#[cfg(not(target_os = "wasi"))] use crate::future::poll_fn; + use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready}; use crate::net::tcp::split::{split, ReadHalf, WriteHalf}; use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf}; + +#[cfg(not(target_os = "wasi"))] use crate::net::{to_socket_addrs, ToSocketAddrs}; use std::convert::TryFrom; @@ -108,6 +112,7 @@ impl TcpStream { /// /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt + #[cfg(not(target_os = "wasi"))] pub async fn connect(addr: A) -> io::Result { let addrs = to_socket_addrs(addr).await?; @@ -129,11 +134,13 @@ impl TcpStream { } /// Establishes a connection to the specified `addr`. + #[cfg(not(target_os = "wasi"))] async fn connect_addr(addr: SocketAddr) -> io::Result { let sys = mio::net::TcpStream::connect(addr)?; TcpStream::connect_mio(sys).await } + #[cfg(not(target_os = "wasi"))] pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result { let stream = TcpStream::new(sys)?; @@ -226,6 +233,7 @@ impl TcpStream { /// [`tokio::net::TcpStream`]: TcpStream /// [`std::net::TcpStream`]: std::net::TcpStream /// [`set_nonblocking`]: fn@std::net::TcpStream::set_nonblocking + #[cfg(not(target_os = "wasi"))] pub fn into_std(self) -> io::Result { #[cfg(unix)] { @@ -1096,10 +1104,17 @@ impl TcpStream { /// # Ok(()) /// # } /// ``` + #[cfg(not(target_os = "wasi"))] pub fn linger(&self) -> io::Result> { socket2::SockRef::from(self).linger() } + /// Returns `io::ErrorKind::Unsupported` as this is not supported on WebAssembly + #[cfg(target_os = "wasi")] + pub fn linger(&self) -> io::Result> { + Err(io::ErrorKind::Unsupported.into()) + } + /// Sets the linger duration of this socket by setting the SO_LINGER option. /// /// This option controls the action taken when a stream has unsent messages and the stream is @@ -1121,10 +1136,18 @@ impl TcpStream { /// # Ok(()) /// # } /// ``` + #[cfg(not(target_os = "wasi"))] pub fn set_linger(&self, dur: Option) -> io::Result<()> { socket2::SockRef::from(self).set_linger(dur) } + /// Returns `io::ErrorKind::Unsupported` as this is not supported on WebAssembly + #[cfg(target_os = "wasi")] + #[allow(unused_variables)] + pub fn set_linger(&self, dur: Option) -> io::Result<()> { + Err(io::ErrorKind::Unsupported.into()) + } + /// Gets the value of the `IP_TTL` option for this socket. /// /// For more information about this option, see [`set_ttl`].