Skip to content

Commit

Permalink
util/io: add SyncIoBridge (#4146)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgwalters committed Oct 22, 2021
1 parent 83aeae8 commit 2734fa9
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 2 deletions.
3 changes: 2 additions & 1 deletion tokio-util/Cargo.toml
Expand Up @@ -23,13 +23,14 @@ categories = ["asynchronous"]
default = []

# Shorthand for enabling everything
full = ["codec", "compat", "io", "time", "net", "rt"]
full = ["codec", "compat", "io-util", "time", "net", "rt"]

net = ["tokio/net"]
compat = ["futures-io",]
codec = []
time = ["tokio/time","slab"]
io = []
io-util = ["io", "tokio/rt", "tokio/io-util"]
rt = ["tokio/rt"]

__docs_rs = ["futures-util"]
Expand Down
12 changes: 12 additions & 0 deletions tokio-util/src/cfg.rs
Expand Up @@ -38,6 +38,18 @@ macro_rules! cfg_io {
}
}

cfg_io! {
macro_rules! cfg_io_util {
($($item:item)*) => {
$(
#[cfg(feature = "io-util")]
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
$item
)*
}
}
}

macro_rules! cfg_rt {
($($item:item)*) => {
$(
Expand Down
10 changes: 9 additions & 1 deletion tokio-util/src/io/mod.rs
@@ -1,14 +1,22 @@
//! Helpers for IO related tasks.
//!
//! These types are often used in combination with hyper or reqwest, as they
//! The stream types are often used in combination with hyper or reqwest, as they
//! allow converting between a hyper [`Body`] and [`AsyncRead`].
//!
//! The [`SyncIoBridge`] type converts from the world of async I/O
//! to synchronous I/O; this may often come up when using synchronous APIs
//! inside [`tokio::task::spawn_blocking`].
//!
//! [`Body`]: https://docs.rs/hyper/0.13/hyper/struct.Body.html
//! [`AsyncRead`]: tokio::io::AsyncRead

mod read_buf;
mod reader_stream;
mod stream_reader;
cfg_io_util! {
mod sync_bridge;
pub use self::sync_bridge::SyncIoBridge;
}

pub use self::read_buf::read_buf;
pub use self::reader_stream::ReaderStream;
Expand Down
103 changes: 103 additions & 0 deletions tokio-util/src/io/sync_bridge.rs
@@ -0,0 +1,103 @@
use std::io::{Read, Write};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

/// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or
/// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`].
#[derive(Debug)]
pub struct SyncIoBridge<T> {
src: T,
rt: tokio::runtime::Handle,
}

impl<T: AsyncRead + Unpin> Read for SyncIoBridge<T> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let src = &mut self.src;
self.rt.block_on(AsyncReadExt::read(src, buf))
}

fn read_to_end(&mut self, buf: &mut Vec<u8>) -> std::io::Result<usize> {
let src = &mut self.src;
self.rt.block_on(src.read_to_end(buf))
}

fn read_to_string(&mut self, buf: &mut String) -> std::io::Result<usize> {
let src = &mut self.src;
self.rt.block_on(src.read_to_string(buf))
}

fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
let src = &mut self.src;
// The AsyncRead trait returns the count, synchronous doesn't.
let _n = self.rt.block_on(src.read_exact(buf))?;
Ok(())
}
}

impl<T: AsyncWrite + Unpin> Write for SyncIoBridge<T> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let src = &mut self.src;
self.rt.block_on(src.write(buf))
}

fn flush(&mut self) -> std::io::Result<()> {
let src = &mut self.src;
self.rt.block_on(src.flush())
}

fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
let src = &mut self.src;
self.rt.block_on(src.write_all(buf))
}

fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
let src = &mut self.src;
self.rt.block_on(src.write_vectored(bufs))
}
}

// Because https://doc.rust-lang.org/std/io/trait.Write.html#method.is_write_vectored is at the time
// of this writing still unstable, we expose this as part of a standalone method.
impl<T: AsyncWrite> SyncIoBridge<T> {
/// Determines if the underlying [`tokio::io::AsyncWrite`] target supports efficient vectored writes.
///
/// See [`tokio::io::AsyncWrite::is_write_vectored`].
pub fn is_write_vectored(&self) -> bool {
self.src.is_write_vectored()
}
}

impl<T: Unpin> SyncIoBridge<T> {
/// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or
/// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`].
///
/// When this struct is created, it captures a handle to the current thread's runtime with [`tokio::runtime::Handle::current`].
/// It is hence OK to move this struct into a separate thread outside the runtime, as created
/// by e.g. [`tokio::task::spawn_blocking`].
///
/// Stated even more strongly: to make use of this bridge, you *must* move
/// it into a separate thread outside the runtime. The synchronous I/O will use the
/// underlying handle to block on the backing asynchronous source, via
/// [`tokio::runtime::Handle::block_on`]. As noted in the documentation for that
/// function, an attempt to `block_on` from an asynchronous execution context
/// will panic.
///
/// # Wrapping `!Unpin` types
///
/// Use e.g. `SyncIoBridge::new(Box::pin(src))`.
///
/// # Panic
///
/// This will panic if called outside the context of a Tokio runtime.
pub fn new(src: T) -> Self {
Self::new_with_handle(src, tokio::runtime::Handle::current())
}

/// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or
/// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`].
///
/// This is the same as [`SyncIoBridge::new`], but allows passing an arbitrary handle and hence may
/// be initially invoked outside of an asynchronous context.
pub fn new_with_handle(src: T, rt: tokio::runtime::Handle) -> Self {
Self { src, rt }
}
}
43 changes: 43 additions & 0 deletions tokio-util/tests/io_sync_bridge.rs
@@ -0,0 +1,43 @@
#![cfg(feature = "io-util")]

use std::error::Error;
use std::io::{Cursor, Read, Result as IoResult};
use tokio::io::AsyncRead;
use tokio_util::io::SyncIoBridge;

async fn test_reader_len(
r: impl AsyncRead + Unpin + Send + 'static,
expected_len: usize,
) -> IoResult<()> {
let mut r = SyncIoBridge::new(r);
let res = tokio::task::spawn_blocking(move || {
let mut buf = Vec::new();
r.read_to_end(&mut buf)?;
Ok::<_, std::io::Error>(buf)
})
.await?;
assert_eq!(res?.len(), expected_len);
Ok(())
}

#[tokio::test]
async fn test_async_read_to_sync() -> Result<(), Box<dyn Error>> {
test_reader_len(tokio::io::empty(), 0).await?;
let buf = b"hello world";
test_reader_len(Cursor::new(buf), buf.len()).await?;
Ok(())
}

#[tokio::test]
async fn test_async_write_to_sync() -> Result<(), Box<dyn Error>> {
let mut dest = Vec::new();
let src = b"hello world";
let dest = tokio::task::spawn_blocking(move || -> Result<_, String> {
let mut w = SyncIoBridge::new(Cursor::new(&mut dest));
std::io::copy(&mut Cursor::new(src), &mut w).map_err(|e| e.to_string())?;
Ok(dest)
})
.await??;
assert_eq!(dest.as_slice(), src);
Ok(())
}

0 comments on commit 2734fa9

Please sign in to comment.