Open hamaluik opened 4 years ago
For now I found that the best way is to use BytesCodec & FramedRead types from tokio-util crate. Internally it minimizes memory allocations, see an investigation here.
So, full code will be:
use reqwest::Body;
use tokio::fs::File;
use tokio_util::codec::{BytesCodec, FramedRead};
fn file_to_body(file: File) -> Body {
let stream = FramedRead::new(file, BytesCodec::new());
Body::wrap_stream(stream)
}
@nickkuk Thanks for the example, any idea how to integrate indicatif? I want to display the current progress.
Is there a way to "do something" (bar.inc(1);
) in every chunk from the stream
or maybe from Body
?
Thanks in advance :-)
Hi, @nbari, if you mean file uploading, here I've just tried to create Stream that consumes stream from
FramedRead::new(file, BytesCodec::new())
and calls inc
inside. But it works not smoothly, maybe there is some buffering between reqwest and the system. At now I don't know better way.
If you mean file downloading, just call inc
inside this loop, but you need to know total file size somehow, not every server gives you content length.
Thanks, I am using this approach, inspect_ok
does the trick:
use futures::stream::TryStreamExt;
use tokio_util::codec::{BytesCodec, FramedRead};
let stream = FramedRead::new(file, BytesCodec::new())
.inspect_ok(|chunk| {
// do X with chunk...
});
let body = Body::wrap_stream(stream);
@nbari, I'm tried both approaches, but eventually decided that it is better to call inc
for previous chunk, i.e., when reqwest reads just another chunk, we believe that it sends all previous. But it is not the case, probably due to some buffering.
Did you get smoothly increasing progress bar?
Hi @nickkuk, I am currently working on it, indeed I am stuck with this, https://stackoverflow.com/q/63374041/1135424, any helps is pretty much appreciated.
let stream = if let Some(mut tx) = sender {
FramedRead::new(file, BytesCodec::new()).inspect_ok(move |chunk|
tx.send(chunk.len())
)
} else {
FramedRead::new(file, BytesCodec::new())
};
// reqwest
let body = Body::wrap_stream(stream);
client.put(url).body(body)
let body = Body::wrap_stream(stream);
I am getting:
`if` and `else` have incompatible types
But for other similar cases (multipart upload), I do get a nice progress bar: https://github.com/s3m/s3m/blob/develop/src/s3m/upload.rs#L55 (currently working/learning so any feedback is more than welcome)
I think you can use async-stream
crate, that I've already mentioned here. It allows you to write sync-like code with yield
instead of using combinators (as map
or inspect_ok
):
async_stream::stream! {
if let Some(mut sender) = sender {
while let Some(bytes) = stream.next().await {
if let Ok(bytes) = &bytes {
sender.send(bytes.len()).await;
}
yield bytes;
}
} else {
while let Some(bytes) = stream.next().await {
yield bytes;
}
}
}
@nickkuk many thanks for the examples, I was indeed dealing with the if/ else
until found I could do:
let stream = async_stream::stream! {
...
};
Hi @nickkuk I found interesting this example:
use tokio::sync::mpsc;
use futures::future::Either;
fn upload(file: String, sender: Option<mpsc::Sender<usize>) {
let stream = FramedRead::new(file, BytesCodec::new());
let stream = if let Some(mut tx) = sender {
Either::Left(stream
.inspect_ok(move |chunk| tx.send(chunk.len()))
)
} else {
Either::Right(stream)
};
let body = Body::wrap_stream(stream);
}
Is not clear for me why It won't compile if I don't add a println!("{}", 0)
:
Either::Left(stream
.inspect_ok(move |chunk| {
tx.send(chunk.len());
println!("{}",0);
}))
Also, I think it requires a .await
: tx.send(chunk.len()).await
; but if I add it I get this error:
await` is only allowed inside `async` functions and blocks
Just asking to understand & learn more, I indeed became curios because code for me looks cleaner when using Either
but would like to know what could be some pros cons against the async-stream
create.
Thanks in advance.
From documentation of inspect_ok
you can see that it expects F: FnOnce(&Self::Ok)
, which means that a closure must returns ()
.
But tx.send(chunk.len())
returns impl Future<Output = Result<(), SendError<T>>
, so it is not magic of println
, but magic of semicolon.
About .await
for send
- you are right, and I use it in my example. It is needed because your Sender
is bounded, so it will be blocked if capacity not enough. You have the following possibilities:
1) use UnboundedSender
, which have send
that returns just Result<(), SendError<T>>
(i.e., without Future
);
2) use async-stream
crate that allows to use .await
and yield
inside stream!
macro;
3) use another combinator of Stream
, e.g., and_then
, that expects F: FnOnce(Self::Ok) -> Future
.
I suggest to use async-stream
because
Either
;Hi @nickkuk many thanks for the explanation and your time on this, taking advantage of the thread, I have one last question regarding this topic, probably too basic but hope you don't mind since I am trying to grasp the concepts of async/await
To increase the progress bar bar.inc(1)
; or read from the Receiver
I am found that I have to calltokio::spawn
to do the request and then that will allow me to read from the channel, otherwise, since there is no reader I get locked, I thought that by using already .await
I could skip the "spawn", my bad probably since I was thinking that await
is like go
( Gorouting from Golang. I end up doing something like this:
let bar = ProgressBar::new(1000);
let (tx, mut rx): (mpsc::Sender<usize>, mpsc::Receiver<usize>) = mpsc::channel(100);
tokio::spawn(async move {
my_upload(file, Some(tx)).await; // <-- method using `async-stream` uploading the file and returning in the chunks.len()
});
while let Some(i) = rx.recv().await {
bar.inc(1);
}
...
Is think is the way to go, but now with too many new options that I just learn about how to do the same task but with different methods, I wonder if there is something more idiomatic to achieve the same.
At first I want to clarify that in my last reply I suggested to use async-stream
not against UnboundedSender
(actually UnboundedSender
is more suitable for your task), but against Stream
combinators (and even this not always true, sometimes combinators are smaller).
Actually you don't need any tokio::spawn
for single file uploading by using tokio::try_join!
; see an example here. Even more, you can upload two, three, or more files by this approach (maybe with the same UnboundedReceiver
for all files and cloned UnboundedSender
). And even unknown number of files can be uploaded without tokio::spawn
by using e.g. futures::stream::FuturesUnordered
.
tokio::spawn
Future
- every rust Future
(i.e., every async fn
including async fn main
, every bunch of combinators, tokio::try_join!
, tokio::select!
, futures::stream::FuturesUnordered
) is state machine - it consumes more memory if it is bigger;Future
(so called task) is executed on zero (waiting among all tasks or on .await
) or one system thread; tokio::spawn
creates new task that can be executed on another thread.For example, tokio::spawn
is used in hyper
server for every new connection and every new request of connection.
Main drawback of tokio::spawn
is that consumed Future
must be 'static
. There was an attempt to handle this constraint, but it failed.
@nickkuk many thanks for the examples and sharing the knowledge, I highly appreciate it :-)
Example for save a response body to file for async, and display download total progress?
I don't know how to use buff to read and write
let rsp = reqwest::Client::new().get("http://aabbcc.com/1.mp4").send().await;
let file = std::fs::File::create(file).unwrap();
let mut buf = [0; 8192];
let mut reader = StreamReader::new(rsp.bytes_stream());
loop {
// do read buff , write buff, print r/w size, util eof
}
Hi people!
I have the below code and I'm stucked because I don't know how to post()
the AsyncRead
, can you help me understand?
#[async_trait::async_trait]
impl Trait for UploadClient {
async fn put_file(
&self,
filename: &str,
mut reader: Pin<&mut (dyn AsyncRead + Send + Sync)>,
) -> Result<u64> {
// let url = ...;
let client = reqwest::Client::new();
let resp = client
.post(url)
// Nothing of these works!
// .body(reader)
// .body(reqwest::Body::wrap_stream(reader))
// .body(ReaderStream::new(reader))
.body(StreamReader::new(reader))
.send()
.await?
.error_for_status()?;
Ok(123)
}
}
Hey @frederikhors, I think that the current standard approach is to use tokio-util helper crate with BytesCodec
and FramedRead
structures. See an example here.
@frederikhors your example would not set the upload length though right ?
There are a number of implementations of From<File> for Body
etc and even a function for streamwithlength which is helpful when uploading an async tokio file stream.
I am interested in this because i have a ProgressReader (reporting on upload progress) which is AsyncReader
which can wrap a file stream etc, but i need it converted to a body to send it.
With some of the other examples above I am worried about backpressure and whether this is correctly handled. Based on the way async reads are handled I am pretty sure AsyncRead will always be pull based and thus handle pressure automatically.
In general, tracking progress (optionally) for sending a request should be something built in.
reqwest::blocking::Body
can be easily built from anyRead
object. I'm working on trying to convert some of this code to async, but can't for the life of me figure out how to do something similar in async land. I assume you have to usereqwest::Body::wrap_stream
, but how to create a proper stream from antokio::fs::AsyncRead
appears to be beyond me as I've been banging my head against this all day. It's easy enough to read the entireAsyncRead
into memory and provide it directly, but when uploading a file, that requires loading the entire file into memory. Any tips would be greatly appreciated, and since I doubt I'm the only one attempting this, perhaps adding an example of posting a file with the new async API would help a lot.Thanks!