seanmonstar / reqwest

An easy and powerful Rust HTTP Client
https://docs.rs/reqwest
Apache License 2.0
9.85k stars 1.11k forks source link

Implement AsyncRead for async response body: #482

Closed theduke closed 4 years ago

theduke commented 5 years ago

reqwest::r#async::Decoder does not currently implement tokio::io::AsyncRead, which makes eg simple copying with tokio::io::copy impossible.

Would be nice to implement this.

dtantsur commented 4 years ago

+1. I've had a twitter exchange on this topic, seems not so hard: https://twitter.com/creepy_owlet/status/1210529708431073280.

dtantsur commented 4 years ago

That's the (mostly untested) code I ended up with (for both directions): https://github.com/dtantsur/rust-openstack/blob/460be44dd2f9bfcc6d1277a7195e0c924e56ddf4/src/object_storage/utils.rs

@seanmonstar does this ^^^ look at least remotely acceptable to you?

seanmonstar commented 4 years ago

The Response body in reqwest is a Stream, which cannot freely expose as an AsyncRead. It requires some additional state to buffer one item of the Stream to then copy from in the read. The futures crate has TryStreamExt::into_async_read which converts into a type with that overhead. Perhaps tokio can gain a similar utility.

dtantsur commented 4 years ago

@seanmonstar the reason it gets (and will get) coming up is that without a sort of Read functionality, Body and Response are weird citizens in the IO world (even your own synchronous Response is Read!). Representing Response as a Stream is handy too, but it's a leaking abstraction exposing the way it's implemented. It's natural for hyper but quite surprising for a library aiming to be high-level. It also makes my life as a downstream library author harder because the problem is now pushed to my level.

benkay86 commented 4 years ago

EDIT: The futures crate is intended to define a common vocabulary of runtime/reactor-agnostic primitives for crates like tokio to build on. However, the futures crate is not part of the Rust standard library, and so there is room for diagreement over what these primitives (like AsyncRead) should look like. As @seanmonstar points out, the issue here is really with tokio deciding to use its own version of the AsyncRead trait that is not compatible with the one in the futures crate. There are valid reasons for this as outlined on Reddit and in this PR. Unfortunately, this divergence in compatibility makes matters difficult for us mere mortals who are trying to use the Rust async ecosystem.

If your project does not depend on the tokio runtime/reactor specifically, you could potentially use async-std instead. Unlike tokio, async-std uses AsyncRead from the futures crate, which means you should be able to:

  1. Use reqwest::get("https://some-url.com").await? or whatever to get a Response.
  2. Convert your Response to a Stream using bytes_stream().
  3. Use into_async_read to convert your Stream into a futures-compatible AsyncRead.
  4. Call async_std::io::copy(), which is analagous to its tokio counterpart except that works on futures-compatible AsyncRead whereas tokio does not.

If you want to stick with tokio as the runtime/reactor then here is one possible workaround:

// URL to download
let url = reqwest::Url::parse("https://upload.wikimedia.org/wikipedia/commons/1/13/Cute_kitten.jpg")?;

// Client object
let client = reqwest::Client::new();

// Client sends request for URL, await response
let mut response = client.get(url).send().await?;

// Make sure server responded OK.
if response.status() == reqwest::StatusCode::OK {
    // Create the output file and wrap it in a write buffer
    let outfile = std::path::PathBuf::from("kitten.jpg");
    let outfile = tokio::fs::File::create(outfile).await?;
    let mut outfile = tokio::io::BufWriter::new(outfile);

    // Do an asynchronous, buffered copy of the download to the output file
    while let Some(chunk) = response.chunk().await? {
        outfile.write(&chunk).await?;
    }

    // Must flush tokio::io::BufWriter manually.
    // It will *not* flush itself automatically when dropped.
    outfile.flush().await?;
}

EDIT EDIT: tokio now has a futures <--> tokio compatibility layer, see more in another comment below.

dakom commented 4 years ago

thanks @benkay86 ! I'm trying to get a grip on doing concurrent downloads... do you mind taking a look at giving tips for how to go from your snippet to a proper concurrent-stream downloader? Here's what I have so far (including your code in a standalone fn):

The idea is that download_and_write_images() gets an Iterator of (Url,PathBuf) pairs and should then download the url to the local path as quickly as possible:

use std::fs::{self, File};
use std::path::{Path, PathBuf};
use url::Url;
use tokio::stream::{iter};
use futures::stream::{StreamExt};
use tokio::io::{AsyncWriteExt};

async fn download_and_write(url:Url, path:PathBuf) -> Result<(Url, PathBuf), Box<dyn std::error::Error>> { 
    let mut response = reqwest::get(url.clone()).await?;

    let outfile = tokio::fs::File::create(&path).await?;
    let mut outfile = tokio::io::BufWriter::new(outfile);

    // Do an asynchronous, buffered copy of the download to the output file
    while let Some(chunk) = response.chunk().await? {
        outfile.write(&chunk).await?;
    }

    // Must flush tokio::io::BufWriter manually.
    // It will *not* flush itself automatically when dropped.
    outfile.flush().await?;

    Ok((url, path))
}
pub async fn download_and_write_images(list_iter:impl Iterator<Item = (Url, PathBuf)>) {
    let mut stream = 
        iter(list_iter)
            .map(|(url, path)| download_and_write(url, path))
            .buffer_unordered(10); //Is this helpful?

   //also, is this the best way... tried for_each_concurrent() but couldn't get it to work
    while let Some(res) = stream.next().await {
        let (url, path) = res.unwrap();
        println!("downloded {} to {}", url, path.as_os_str().to_str().unwrap());
    }
}
benkay86 commented 4 years ago

First off, I would recommend using the following for your type-erased generic error type. I explain the rationale in my blog post on Rust errors, but the short version is that you probably want to be able to send errors between threads.

pub type BoxError = std::boxed::Box<dyn
    std::error::Error   // must implement Error to satisfy ?
    + std::marker::Send // needed for threads
    + std::marker::Sync // needed for threads
>;

In my code I am doing almost exactly the same thing in terms of parallel downloads using the futures::stream module. I used a combination of buffer_unordered() and try_for_each() instead of a while let, although you could probably make it work either way. Try something like this (not tested):

pub async fn download_and_write_images(list_iter:impl Iterator<Item = (Url, PathBuf)>) -> Result<(), BoxError> {
    futures::stream::iter(list_iter)
        .map(|(url, path)| download_and_write(url, path))
        .buffer_unordered(10); // this will try to do 10 downloads in parallel
        .try_for_each(|res| async move {
            let (url, path) = res?;
            println!("downloaded {} to {}", url, path.as_os_str().to_str().unwrap());
            Ok(())
        });
}

A few pearls here:

Suppose you continue to use buffer_unordered(n) to limit the number of parallel downloads from a single server, but you would like to be able to download from two different servers in parallel for up to 2n concurrent connections. You could add another layer of concurrency like this:

// Group downloads from server 1.
let server1_downloads = tokio::spawn(async move {
    // initialization steps here if needed...
    download_and_write_images(server1_list).await
});

// Group downloads from server 2.
let server2_downloads = tokio::spawn(async move {
    // initialization steps here if needed...
    download_and_write_images(server2_list).await
});

// Wait for all downloads to finish.
// Downloads from servers 1 and 2 will run in parallel.
server1_downloads.await??;
server2_downloads.await??;

The pearls here:

Hope this helps get you started.

dakom commented 4 years ago

excellent, thanks!!

benkay86 commented 4 years ago

For anyone still following this, tokio now has a futures <--> tokio compatibility layer as of this PR. You can now use tokio::io::copy on a stream returned by reqwest just as @theduke originally requested.

I've posted some examples including this one of how to use the compatibility layer.

Globidev commented 8 months ago

For anyone stumbling upon this, there is now also a tokio_util::io::StreamReader which does the job:

fn response_to_async_read(resp: reqwest::Response) -> impl tokio::io::AsyncRead {
    use futures::stream::TryStreamExt;

    let stream = resp.bytes_stream().map_err(std::io::Error::other);
    tokio_util::io::StreamReader::new(stream)
}