Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement AsyncRead for async response body: #482

Closed
theduke opened this issue Mar 27, 2019 · 10 comments
Closed

Implement AsyncRead for async response body: #482

theduke opened this issue Mar 27, 2019 · 10 comments

Comments

@theduke
Copy link
Contributor

theduke commented Mar 27, 2019

reqwest::r#async::Decoder does not currently implement tokio::io::AsyncRead, which makes eg simple copying with tokio::io::copy impossible.

Would be nice to implement this.

@theduke theduke changed the title Async response body: AsyncRead Implement AsyncRead for async response body: Mar 27, 2019
@dtantsur
Copy link

+1. I've had a twitter exchange on this topic, seems not so hard: https://twitter.com/creepy_owlet/status/1210529708431073280.

@dtantsur
Copy link

dtantsur commented Jan 1, 2020

That's the (mostly untested) code I ended up with (for both directions): https://github.com/dtantsur/rust-openstack/blob/460be44dd2f9bfcc6d1277a7195e0c924e56ddf4/src/object_storage/utils.rs

@seanmonstar does this ^^^ look at least remotely acceptable to you?

@seanmonstar
Copy link
Owner

The Response body in reqwest is a Stream, which cannot freely expose as an AsyncRead. It requires some additional state to buffer one item of the Stream to then copy from in the read. The futures crate has TryStreamExt::into_async_read which converts into a type with that overhead. Perhaps tokio can gain a similar utility.

@dtantsur
Copy link

dtantsur commented Jan 2, 2020

@seanmonstar the reason it gets (and will get) coming up is that without a sort of Read functionality, Body and Response are weird citizens in the IO world (even your own synchronous Response is Read!). Representing Response as a Stream is handy too, but it's a leaking abstraction exposing the way it's implemented. It's natural for hyper but quite surprising for a library aiming to be high-level. It also makes my life as a downstream library author harder because the problem is now pushed to my level.

@benkay86
Copy link

benkay86 commented Feb 3, 2020

EDIT: The futures crate is intended to define a common vocabulary of runtime/reactor-agnostic primitives for crates like tokio to build on. However, the futures crate is not part of the Rust standard library, and so there is room for diagreement over what these primitives (like AsyncRead) should look like. As @seanmonstar points out, the issue here is really with tokio deciding to use its own version of the AsyncRead trait that is not compatible with the one in the futures crate. There are valid reasons for this as outlined on Reddit and in this PR. Unfortunately, this divergence in compatibility makes matters difficult for us mere mortals who are trying to use the Rust async ecosystem.

If your project does not depend on the tokio runtime/reactor specifically, you could potentially use async-std instead. Unlike tokio, async-std uses AsyncRead from the futures crate, which means you should be able to:

  1. Use reqwest::get("https://some-url.com").await? or whatever to get a Response.
  2. Convert your Response to a Stream using bytes_stream().
  3. Use into_async_read to convert your Stream into a futures-compatible AsyncRead.
  4. Call async_std::io::copy(), which is analagous to its tokio counterpart except that works on futures-compatible AsyncRead whereas tokio does not.

If you want to stick with tokio as the runtime/reactor then here is one possible workaround:

// URL to download
let url = reqwest::Url::parse("https://upload.wikimedia.org/wikipedia/commons/1/13/Cute_kitten.jpg")?;

// Client object
let client = reqwest::Client::new();

// Client sends request for URL, await response
let mut response = client.get(url).send().await?;

// Make sure server responded OK.
if response.status() == reqwest::StatusCode::OK {
    // Create the output file and wrap it in a write buffer
    let outfile = std::path::PathBuf::from("kitten.jpg");
    let outfile = tokio::fs::File::create(outfile).await?;
    let mut outfile = tokio::io::BufWriter::new(outfile);
    
    // Do an asynchronous, buffered copy of the download to the output file
    while let Some(chunk) = response.chunk().await? {
        outfile.write(&chunk).await?;
    }
    
    // Must flush tokio::io::BufWriter manually.
    // It will *not* flush itself automatically when dropped.
    outfile.flush().await?;
}

EDIT EDIT: tokio now has a futures <--> tokio compatibility layer, see more in another comment below.

@dakom
Copy link

dakom commented Feb 10, 2020

thanks @benkay86 ! I'm trying to get a grip on doing concurrent downloads... do you mind taking a look at giving tips for how to go from your snippet to a proper concurrent-stream downloader? Here's what I have so far (including your code in a standalone fn):

The idea is that download_and_write_images() gets an Iterator of (Url,PathBuf) pairs and should then download the url to the local path as quickly as possible:

use std::fs::{self, File};
use std::path::{Path, PathBuf};
use url::Url;
use tokio::stream::{iter};
use futures::stream::{StreamExt};
use tokio::io::{AsyncWriteExt};


async fn download_and_write(url:Url, path:PathBuf) -> Result<(Url, PathBuf), Box<dyn std::error::Error>> { 
    let mut response = reqwest::get(url.clone()).await?;

    let outfile = tokio::fs::File::create(&path).await?;
    let mut outfile = tokio::io::BufWriter::new(outfile);
    
    // Do an asynchronous, buffered copy of the download to the output file
    while let Some(chunk) = response.chunk().await? {
        outfile.write(&chunk).await?;
    }
    
    // Must flush tokio::io::BufWriter manually.
    // It will *not* flush itself automatically when dropped.
    outfile.flush().await?;

    Ok((url, path))
}
pub async fn download_and_write_images(list_iter:impl Iterator<Item = (Url, PathBuf)>) {
    let mut stream = 
        iter(list_iter)
            .map(|(url, path)| download_and_write(url, path))
            .buffer_unordered(10); //Is this helpful?

   //also, is this the best way... tried for_each_concurrent() but couldn't get it to work
    while let Some(res) = stream.next().await {
        let (url, path) = res.unwrap();
        println!("downloded {} to {}", url, path.as_os_str().to_str().unwrap());
    }
}

@benkay86
Copy link

First off, I would recommend using the following for your type-erased generic error type. I explain the rationale in my blog post on Rust errors, but the short version is that you probably want to be able to send errors between threads.

pub type BoxError = std::boxed::Box<dyn
	std::error::Error   // must implement Error to satisfy ?
	+ std::marker::Send // needed for threads
	+ std::marker::Sync // needed for threads
>;

In my code I am doing almost exactly the same thing in terms of parallel downloads using the futures::stream module. I used a combination of buffer_unordered() and try_for_each() instead of a while let, although you could probably make it work either way. Try something like this (not tested):

pub async fn download_and_write_images(list_iter:impl Iterator<Item = (Url, PathBuf)>) -> Result<(), BoxError> {
    futures::stream::iter(list_iter)
        .map(|(url, path)| download_and_write(url, path))
        .buffer_unordered(10); // this will try to do 10 downloads in parallel
        .try_for_each(|res| async move {
            let (url, path) = res?;
            println!("downloaded {} to {}", url, path.as_os_str().to_str().unwrap());
            Ok(())
        });
}

A few pearls here:

  • buffer_unordered(n) will try do do up to n downloads in parallel.
  • If you want to try to execute the whole stream concurrently (not necessarily advisable if all downloads are from the same server) then use try_for_each_concurrent() instead.
  • try_for_each() expects you to give it an async block.
  • try_for_each() expects a Result<(),_>, which means you must return Ok(()) on success, but can return any error type on failure.
  • Note that theoretically println!() can block, although this is unlikely in most cases. See tokio::io::stdout for a truly async alternative.

Suppose you continue to use buffer_unordered(n) to limit the number of parallel downloads from a single server, but you would like to be able to download from two different servers in parallel for up to 2n concurrent connections. You could add another layer of concurrency like this:

// Group downloads from server 1.
let server1_downloads = tokio::spawn(async move {
    // initialization steps here if needed...
    download_and_write_images(server1_list).await
});

// Group downloads from server 2.
let server2_downloads = tokio::spawn(async move {
    // initialization steps here if needed...
    download_and_write_images(server2_list).await
});

// Wait for all downloads to finish.
// Downloads from servers 1 and 2 will run in parallel.
server1_downloads.await??;
server2_downloads.await??;

The pearls here:

  • tokio::spawn expects an async block.
  • You will probably need to do some initialization of variables moved into tokio::spawn because they are not scoped, i.e. they will need to be moved in even if that involves cloning.
  • You should await the result of download_and_write_images() inside tokio::spawn so that you do not return a nested future, but you should not unwrap the result.
  • At the final serverX_downloads.await there are two results to unwrap. The first is whether or not the thread/task was successfully joined. The second is the result of download_and_write_images().

Hope this helps get you started.

@dakom
Copy link

dakom commented Feb 10, 2020

excellent, thanks!!

@benkay86
Copy link

For anyone still following this, tokio now has a futures <--> tokio compatibility layer as of this PR. You can now use tokio::io::copy on a stream returned by reqwest just as @theduke originally requested.

I've posted some examples including this one of how to use the compatibility layer.

@Globidev
Copy link

For anyone stumbling upon this, there is now also a tokio_util::io::StreamReader which does the job:

fn response_to_async_read(resp: reqwest::Response) -> impl tokio::io::AsyncRead {
    use futures::stream::TryStreamExt;

    let stream = resp.bytes_stream().map_err(std::io::Error::other);
    tokio_util::io::StreamReader::new(stream)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants