Skip to content

Commit

Permalink
util/io: Add SyncIOBridge
Browse files Browse the repository at this point in the history
I'm doing quite a bit of stuff inside `spawn_blocking` because
I need to use some large synchronous APIs.

I found it not entirely obvious how to "bridge" the world of
async I/O with sync I/O.

Since we have a handy "tokio-util" module with an "io" feature,
these small helpers seem like a good fit hopefully!

Extracted these from my code in https://github.com/ostreedev/ostree-rs-ext/blob/main/lib/src/async_util.rs
  • Loading branch information
cgwalters committed Oct 3, 2021
1 parent 9ff7d8c commit 913b94e
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 1 deletion.
8 changes: 7 additions & 1 deletion tokio-util/src/io/mod.rs
@@ -1,16 +1,22 @@
//! Helpers for IO related tasks.
//!
//! These types are often used in combination with hyper or reqwest, as they
//! The stream types are often used in combination with hyper or reqwest, as they
//! allow converting between a hyper [`Body`] and [`AsyncRead`].
//!
//! The [`ReadBridge`] and [`WriteBridge`] convert from the world of async I/O
//! to synchronous I/O; this may often come up when using synchronous APIs
//! inside [`tokio::task::spawn_blocking`].
//!
//! [`Body`]: https://docs.rs/hyper/0.13/hyper/struct.Body.html
//! [`AsyncRead`]: tokio::io::AsyncRead

mod read_buf;
mod reader_stream;
mod stream_reader;
mod sync_bridge;

pub use self::read_buf::read_buf;
pub use self::reader_stream::ReaderStream;
pub use self::stream_reader::StreamReader;
pub use self::sync_bridge::SyncIOBridge;
pub use crate::util::{poll_read_buf, poll_write_buf};
46 changes: 46 additions & 0 deletions tokio-util/src/io/sync_bridge.rs
@@ -0,0 +1,46 @@
use pin_project_lite::pin_project;
use std::io::{Read, Write};
use std::pin::Pin;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

pin_project! {
/// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or
/// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`].
#[derive(Debug)]
pub struct SyncIOBridge<T> {
src: Pin<Box<T>>,
#[pin]
rt: tokio::runtime::Handle,
}
}

impl<T: AsyncRead> Read for SyncIOBridge<T> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let src = &mut self.src;
self.rt.block_on(src.read(buf))
}
}

impl<T: AsyncWrite> Write for SyncIOBridge<T> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let src = &mut self.src;
self.rt.block_on(src.write(buf))
}

fn flush(&mut self) -> std::io::Result<()> {
let src = &mut self.src;
self.rt.block_on(src.flush())
}
}

impl<T: AsyncRead> SyncIOBridge<T> {
/// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or
/// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`].
///
/// This is useful with e.g. [`tokio::task::spawn_blocking`].
pub fn new(src: T) -> Self {
let src = Box::pin(src);
let rt = tokio::runtime::Handle::current();
Self { src, rt }
}
}
43 changes: 43 additions & 0 deletions tokio-util/tests/io_sync_bridge.rs
@@ -0,0 +1,43 @@
#![cfg(feature = "io")]

use std::error::Error;
use std::io::{Cursor, Read, Result as IoResult};
use tokio::io::AsyncRead;
use tokio_util::io::SyncIOBridge;

async fn test_reader_len(
r: impl AsyncRead + Unpin + Send + 'static,
expected_len: usize,
) -> IoResult<()> {
let mut r = SyncIOBridge::new(r);
let res = tokio::task::spawn_blocking(move || {
let mut buf = Vec::new();
r.read_to_end(&mut buf)?;
Ok::<_, std::io::Error>(buf)
})
.await?;
assert_eq!(res?.len(), expected_len);
Ok(())
}

#[tokio::test]
async fn test_async_read_to_sync() -> Result<(), Box<dyn Error>> {
test_reader_len(tokio::io::empty(), 0).await?;
let buf = b"hello world";
test_reader_len(Cursor::new(buf), buf.len()).await?;
Ok(())
}

#[tokio::test]
async fn test_async_write_to_sync() -> Result<(), Box<dyn Error>> {
let mut dest = Vec::new();
let src = b"hello world";
let dest = tokio::task::spawn_blocking(move || -> Result<_, String> {
let mut w = SyncIOBridge::new(Cursor::new(&mut dest));
std::io::copy(&mut Cursor::new(src), &mut w).map_err(|e| e.to_string())?;
Ok(dest)
})
.await??;
assert_eq!(dest.as_slice(), src);
Ok(())
}

0 comments on commit 913b94e

Please sign in to comment.