From 919d82737884f1ab1ef72c0a0097f4628e3b2204 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Fri, 1 Oct 2021 17:13:11 -0400 Subject: [PATCH 01/12] util/io: Add `SyncIoBridge` I'm doing quite a bit of stuff inside `spawn_blocking` because I need to use some large synchronous APIs. I found it not entirely obvious how to "bridge" the world of async I/O with sync I/O. Since we have a handy "tokio-util" module with an "io" feature, these small helpers seem like a good fit hopefully! Extracted these from my code in https://github.com/ostreedev/ostree-rs-ext/blob/main/lib/src/async_util.rs --- tokio-util/src/io/mod.rs | 8 +++- tokio-util/src/io/sync_bridge.rs | 76 ++++++++++++++++++++++++++++++ tokio-util/tests/io_sync_bridge.rs | 43 +++++++++++++++++ 3 files changed, 126 insertions(+), 1 deletion(-) create mode 100644 tokio-util/src/io/sync_bridge.rs create mode 100644 tokio-util/tests/io_sync_bridge.rs diff --git a/tokio-util/src/io/mod.rs b/tokio-util/src/io/mod.rs index eec74448890..330d768ace3 100644 --- a/tokio-util/src/io/mod.rs +++ b/tokio-util/src/io/mod.rs @@ -1,16 +1,22 @@ //! Helpers for IO related tasks. //! -//! These types are often used in combination with hyper or reqwest, as they +//! The stream types are often used in combination with hyper or reqwest, as they //! allow converting between a hyper [`Body`] and [`AsyncRead`]. //! +//! The [`SyncIoBridge`] type converts from the world of async I/O +//! to synchronous I/O; this may often come up when using synchronous APIs +//! inside [`tokio::task::spawn_blocking`]. +//! //! [`Body`]: https://docs.rs/hyper/0.13/hyper/struct.Body.html //! [`AsyncRead`]: tokio::io::AsyncRead mod read_buf; mod reader_stream; mod stream_reader; +mod sync_bridge; pub use self::read_buf::read_buf; pub use self::reader_stream::ReaderStream; pub use self::stream_reader::StreamReader; +pub use self::sync_bridge::SyncIoBridge; pub use crate::util::{poll_read_buf, poll_write_buf}; diff --git a/tokio-util/src/io/sync_bridge.rs b/tokio-util/src/io/sync_bridge.rs new file mode 100644 index 00000000000..8e8543a09b4 --- /dev/null +++ b/tokio-util/src/io/sync_bridge.rs @@ -0,0 +1,76 @@ +use std::io::{Read, Write}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; + +/// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or +/// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`]. +#[derive(Debug)] +pub struct SyncIoBridge { + src: T, + rt: tokio::runtime::Handle, +} + +impl Read for SyncIoBridge { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let src = &mut self.src; + self.rt.block_on(src.read(buf)) + } +} + +impl Write for SyncIoBridge { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let src = &mut self.src; + self.rt.block_on(src.write(buf)) + } + + fn flush(&mut self) -> std::io::Result<()> { + let src = &mut self.src; + self.rt.block_on(src.flush()) + } + + fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> { + let src = &mut self.src; + self.rt.block_on(src.write_all(buf)) + } + + fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result { + let src = &mut self.src; + self.rt.block_on(src.write_vectored(bufs)) + } +} + +// Because https://doc.rust-lang.org/std/io/trait.Write.html#method.is_write_vectored is at the time +// of this writing still unstable, we expose this as part of a standalone method. +impl SyncIoBridge { + /// Determines if the underlying [`tokio::io::AsyncWrite`] target supports efficient vectored writes. + /// + /// See [`tokio::io::AsyncWrite::is_write_vectored`]. + pub fn is_write_vectored(&self) -> bool { + self.src.is_write_vectored() + } +} + +impl SyncIoBridge { + /// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or + /// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`]. + /// + /// This is useful with e.g. [`tokio::task::spawn_blocking`]. + /// + /// This works via capturing a handle to the current thread's runtime with [`tokio::runtime::Handle::current`]. + /// Synchronous I/O will use that handle to block on the backing asynchronous source. + /// + /// # Panic + /// + /// This will panic if called outside the context of a Tokio runtime. + pub fn new(src: T) -> Self { + Self::new_with_handle(src, tokio::runtime::Handle::current()) + } + + /// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or + /// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`]. + /// + /// This is the same as [`SyncIoBridge::new`], but allows passing an arbitrary handle and hence may + /// be invoked outside of an asynchronous context. + pub fn new_with_handle(src: T, rt: tokio::runtime::Handle) -> Self { + Self { src, rt } + } +} diff --git a/tokio-util/tests/io_sync_bridge.rs b/tokio-util/tests/io_sync_bridge.rs new file mode 100644 index 00000000000..e9a392f4774 --- /dev/null +++ b/tokio-util/tests/io_sync_bridge.rs @@ -0,0 +1,43 @@ +#![cfg(feature = "io")] + +use std::error::Error; +use std::io::{Cursor, Read, Result as IoResult}; +use tokio::io::AsyncRead; +use tokio_util::io::SyncIoBridge; + +async fn test_reader_len( + r: impl AsyncRead + Unpin + Send + 'static, + expected_len: usize, +) -> IoResult<()> { + let mut r = SyncIoBridge::new(r); + let res = tokio::task::spawn_blocking(move || { + let mut buf = Vec::new(); + r.read_to_end(&mut buf)?; + Ok::<_, std::io::Error>(buf) + }) + .await?; + assert_eq!(res?.len(), expected_len); + Ok(()) +} + +#[tokio::test] +async fn test_async_read_to_sync() -> Result<(), Box> { + test_reader_len(tokio::io::empty(), 0).await?; + let buf = b"hello world"; + test_reader_len(Cursor::new(buf), buf.len()).await?; + Ok(()) +} + +#[tokio::test] +async fn test_async_write_to_sync() -> Result<(), Box> { + let mut dest = Vec::new(); + let src = b"hello world"; + let dest = tokio::task::spawn_blocking(move || -> Result<_, String> { + let mut w = SyncIoBridge::new(Cursor::new(&mut dest)); + std::io::copy(&mut Cursor::new(src), &mut w).map_err(|e| e.to_string())?; + Ok(dest) + }) + .await??; + assert_eq!(dest.as_slice(), src); + Ok(()) +} From ca0416486815a47c0f2a39044292bae8f40d73ae Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Mon, 4 Oct 2021 13:55:32 -0400 Subject: [PATCH 02/12] fix bounds --- tokio-util/src/io/sync_bridge.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-util/src/io/sync_bridge.rs b/tokio-util/src/io/sync_bridge.rs index 8e8543a09b4..d02ab4d4050 100644 --- a/tokio-util/src/io/sync_bridge.rs +++ b/tokio-util/src/io/sync_bridge.rs @@ -49,7 +49,7 @@ impl SyncIoBridge { } } -impl SyncIoBridge { +impl SyncIoBridge { /// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or /// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`]. /// From 0b35775425669159f497661c37b91babb11e2456 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Mon, 4 Oct 2021 13:58:32 -0400 Subject: [PATCH 03/12] update docs --- tokio-util/src/io/sync_bridge.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/tokio-util/src/io/sync_bridge.rs b/tokio-util/src/io/sync_bridge.rs index d02ab4d4050..998c13dd563 100644 --- a/tokio-util/src/io/sync_bridge.rs +++ b/tokio-util/src/io/sync_bridge.rs @@ -53,10 +53,16 @@ impl SyncIoBridge { /// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or /// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`]. /// - /// This is useful with e.g. [`tokio::task::spawn_blocking`]. + /// When this struct is created, it captures a handle to the current thread's runtime with [`tokio::runtime::Handle::current`]. + /// It is hence OK to move this struct into a separate thread outside the runtime, as created + /// by e.g. [`tokio::task::spawn_blocking`]. /// - /// This works via capturing a handle to the current thread's runtime with [`tokio::runtime::Handle::current`]. - /// Synchronous I/O will use that handle to block on the backing asynchronous source. + /// Stated even more strongly: to make use of this bridge, you *must* move + /// it into a separate thread outside the runtime. The synchronous I/O will use the + /// underlying handle to block on the backing asynchronous source, via + /// [`tokio::runtime::Handle::block_on`]. As noted in the documentation for that + /// function, an attempt to `block_on` from an asynchronous execution context + /// will panic. /// /// # Panic /// @@ -69,7 +75,7 @@ impl SyncIoBridge { /// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`]. /// /// This is the same as [`SyncIoBridge::new`], but allows passing an arbitrary handle and hence may - /// be invoked outside of an asynchronous context. + /// be initially invoked outside of an asynchronous context. pub fn new_with_handle(src: T, rt: tokio::runtime::Handle) -> Self { Self { src, rt } } From 2d5c9c83cf8a2f6e809192a06c8887df6d168e14 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Mon, 4 Oct 2021 14:15:21 -0400 Subject: [PATCH 04/12] forward more read methods --- tokio-util/src/io/sync_bridge.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/tokio-util/src/io/sync_bridge.rs b/tokio-util/src/io/sync_bridge.rs index 998c13dd563..f61a75a7a13 100644 --- a/tokio-util/src/io/sync_bridge.rs +++ b/tokio-util/src/io/sync_bridge.rs @@ -14,6 +14,23 @@ impl Read for SyncIoBridge { let src = &mut self.src; self.rt.block_on(src.read(buf)) } + + fn read_to_end(&mut self, buf: &mut Vec) -> std::io::Result { + let src = &mut self.src; + self.rt.block_on(src.read_to_end(buf)) + } + + fn read_to_string(&mut self, buf: &mut String) -> std::io::Result { + let src = &mut self.src; + self.rt.block_on(src.read_to_string(buf)) + } + + fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> { + let src = &mut self.src; + // The AsyncRead trait returns the count, synchronous doesn't. + let _n = self.rt.block_on(src.read_exact(buf))?; + Ok(()) + } } impl Write for SyncIoBridge { From 59a9566b50ff5b86b6b0d892616d74d2f7abf046 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Tue, 5 Oct 2021 09:37:09 -0400 Subject: [PATCH 05/12] Mention !Unpin types --- tokio-util/src/io/sync_bridge.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tokio-util/src/io/sync_bridge.rs b/tokio-util/src/io/sync_bridge.rs index f61a75a7a13..c059ec7751f 100644 --- a/tokio-util/src/io/sync_bridge.rs +++ b/tokio-util/src/io/sync_bridge.rs @@ -81,6 +81,10 @@ impl SyncIoBridge { /// function, an attempt to `block_on` from an asynchronous execution context /// will panic. /// + /// # Wrapping `!Unpin` types + /// + /// Use e.g. `SyncIoBridge::new(Box::pin(src))`. + /// /// # Panic /// /// This will panic if called outside the context of a Tokio runtime. From e6051a051339035170eadd0ac731284d9e06e21a Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Thu, 7 Oct 2021 09:58:02 -0400 Subject: [PATCH 06/12] If an empty commit is pushed in the git tree, does the CI hear it? From 29795db47321c49570d342917fb348427f0c4626 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Fri, 8 Oct 2021 08:47:51 -0400 Subject: [PATCH 07/12] Update tokio-util/src/io/sync_bridge.rs Co-authored-by: Alice Ryhl --- tokio-util/src/io/sync_bridge.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-util/src/io/sync_bridge.rs b/tokio-util/src/io/sync_bridge.rs index c059ec7751f..9be9446a7de 100644 --- a/tokio-util/src/io/sync_bridge.rs +++ b/tokio-util/src/io/sync_bridge.rs @@ -12,7 +12,7 @@ pub struct SyncIoBridge { impl Read for SyncIoBridge { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { let src = &mut self.src; - self.rt.block_on(src.read(buf)) + self.rt.block_on(AsyncReadExt::read(src, buf)) } fn read_to_end(&mut self, buf: &mut Vec) -> std::io::Result { From 541011b2d509b04ef20ea5e302c2582898b710b8 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Wed, 20 Oct 2021 12:23:23 -0400 Subject: [PATCH 08/12] Have `io` depend on `tokio/io-util` for SyncIoBridge Necessary for the sync bridge work. I don't know whether this is OK. --- tokio-util/Cargo.toml | 2 +- tokio-util/src/io/sync_bridge.rs | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index 8c0481eca08..aafe1dca0ac 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -29,7 +29,7 @@ net = ["tokio/net"] compat = ["futures-io",] codec = [] time = ["tokio/time","slab"] -io = [] +io = ["tokio/io-util"] rt = ["tokio/rt"] __docs_rs = ["futures-util"] diff --git a/tokio-util/src/io/sync_bridge.rs b/tokio-util/src/io/sync_bridge.rs index 9be9446a7de..b723a5695ff 100644 --- a/tokio-util/src/io/sync_bridge.rs +++ b/tokio-util/src/io/sync_bridge.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "io")] + use std::io::{Read, Write}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; From 2d8b60c4a96389935578b19316144b01379733b1 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Thu, 21 Oct 2021 09:00:50 -0400 Subject: [PATCH 09/12] Remove unncessary build constraint --- tokio-util/src/io/sync_bridge.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/tokio-util/src/io/sync_bridge.rs b/tokio-util/src/io/sync_bridge.rs index b723a5695ff..9be9446a7de 100644 --- a/tokio-util/src/io/sync_bridge.rs +++ b/tokio-util/src/io/sync_bridge.rs @@ -1,5 +1,3 @@ -#[cfg(feature = "io")] - use std::io::{Read, Write}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; From e7bab76c27fcbcf42e0f1432a8203b1cf01721dd Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Thu, 21 Oct 2021 09:47:24 -0400 Subject: [PATCH 10/12] util: Create a new `io-util` feature that depends on `rt` and `io` Move sync-bridge to it. --- tokio-util/Cargo.toml | 5 +++-- tokio-util/src/cfg.rs | 10 ++++++++++ tokio-util/src/io/mod.rs | 6 ++++-- tokio-util/tests/io_sync_bridge.rs | 2 +- 4 files changed, 18 insertions(+), 5 deletions(-) diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index aafe1dca0ac..f795d1d158a 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -23,13 +23,14 @@ categories = ["asynchronous"] default = [] # Shorthand for enabling everything -full = ["codec", "compat", "io", "time", "net", "rt"] +full = ["codec", "compat", "io-util", "time", "net", "rt"] net = ["tokio/net"] compat = ["futures-io",] codec = [] time = ["tokio/time","slab"] -io = ["tokio/io-util"] +io = [] +io-util = ["io", "rt", "tokio/io-util"] rt = ["tokio/rt"] __docs_rs = ["futures-util"] diff --git a/tokio-util/src/cfg.rs b/tokio-util/src/cfg.rs index b68782eb8db..e7ae09e07fc 100644 --- a/tokio-util/src/cfg.rs +++ b/tokio-util/src/cfg.rs @@ -38,6 +38,16 @@ macro_rules! cfg_io { } } +macro_rules! cfg_io_util { + ($($item:item)*) => { + $( + #[cfg(feature = "io-util")] + #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] + $item + )* + } +} + macro_rules! cfg_rt { ($($item:item)*) => { $( diff --git a/tokio-util/src/io/mod.rs b/tokio-util/src/io/mod.rs index 330d768ace3..eb48a21fb98 100644 --- a/tokio-util/src/io/mod.rs +++ b/tokio-util/src/io/mod.rs @@ -13,10 +13,12 @@ mod read_buf; mod reader_stream; mod stream_reader; -mod sync_bridge; +cfg_io_util! { + mod sync_bridge; + pub use self::sync_bridge::SyncIoBridge; +} pub use self::read_buf::read_buf; pub use self::reader_stream::ReaderStream; pub use self::stream_reader::StreamReader; -pub use self::sync_bridge::SyncIoBridge; pub use crate::util::{poll_read_buf, poll_write_buf}; diff --git a/tokio-util/tests/io_sync_bridge.rs b/tokio-util/tests/io_sync_bridge.rs index e9a392f4774..0d420857b50 100644 --- a/tokio-util/tests/io_sync_bridge.rs +++ b/tokio-util/tests/io_sync_bridge.rs @@ -1,4 +1,4 @@ -#![cfg(feature = "io")] +#![cfg(feature = "io-util")] use std::error::Error; use std::io::{Cursor, Read, Result as IoResult}; From 0a91e6de5c75674fd3bd7fe80e3480ed5b5a9709 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Thu, 21 Oct 2021 10:08:39 -0400 Subject: [PATCH 11/12] util: We only need io-util macro if we have io --- tokio-util/src/cfg.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/tokio-util/src/cfg.rs b/tokio-util/src/cfg.rs index e7ae09e07fc..4035255aff0 100644 --- a/tokio-util/src/cfg.rs +++ b/tokio-util/src/cfg.rs @@ -38,13 +38,15 @@ macro_rules! cfg_io { } } -macro_rules! cfg_io_util { - ($($item:item)*) => { - $( - #[cfg(feature = "io-util")] - #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] - $item - )* +cfg_io! { + macro_rules! cfg_io_util { + ($($item:item)*) => { + $( + #[cfg(feature = "io-util")] + #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] + $item + )* + } } } From 0584c0367bd25bab75b0b978cf72c7edec7dd4f5 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Thu, 21 Oct 2021 15:38:43 -0400 Subject: [PATCH 12/12] Update tokio-util/Cargo.toml Co-authored-by: Alice Ryhl --- tokio-util/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index f795d1d158a..9229cb26f54 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -30,7 +30,7 @@ compat = ["futures-io",] codec = [] time = ["tokio/time","slab"] io = [] -io-util = ["io", "rt", "tokio/io-util"] +io-util = ["io", "tokio/rt", "tokio/io-util"] rt = ["tokio/rt"] __docs_rs = ["futures-util"]