actix / actix-web

Actix Web is a powerful, pragmatic, and extremely fast web framework for Rust.
https://actix.rs
Apache License 2.0
21.6k stars 1.67k forks source link

Improve streaming interface #1690

Open frostyplanet opened 4 years ago

frostyplanet commented 4 years ago

We use actix-web to handle some large file download, currently ResponseBuilder can only provide streaming() interface which requires user to implment future::stream::Stream interface. (https://docs.rs/actix-web/3.0.2/actix_web/dev/struct.HttpResponseBuilder.html#method.streaming) I think a trait simular to tokio::io::AsyncReader is prefered over Stream interface with morden async/await, base on the following reasons:

1) Reading over AsyncReader only pin a generated future on stack, while implenting a Stream usually requires using boxed future to poll other async fn, which is not effecient. And rust generator feature seams not going to be stable soon.

2) We discover is some cases Stream will be dropped once http client decided to stop reading it, when all the content-length has been read or early close due to error. the stream will not be poll until next() return None. there's no chance for a stream to cleanup it's resource in async way. (ie, notify remote storage to close the file connection).

fakeshadow commented 4 years ago

For now you can use a channel to write async function style stream handling as alternative.

use actix_web::{get, App, HttpServer, HttpResponse, Error};
use actix_web::web::{Payload, Bytes};
use futures::StreamExt;

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| App::new().service(index))
        .bind("0.0.0.0:8080")?
        .run()
        .await
}

#[get("/")]
async fn index(mut payload:  Payload) -> HttpResponse {
    let (tx, rx) = actix_utils::mpsc::channel::<Result<Bytes, Error>>();
    actix_web::rt::spawn(async move {
        if tx.send(Ok(Bytes::from_static(b"trying to echo"))).is_err() {
            return;
        };

        while let Some(chunk) = payload.next().await {
            if tx.send(chunk.map_err(Into::into)).is_err() {
                return;
            }
        }
    });

    HttpResponse::Ok().streaming(rx)
}
frostyplanet commented 4 years ago

thanks, I think channel may be slower comparing with implmenting stream based on boxed future function. In addition I've manage to replace all the async fn with sized future, but the code more complexly hand-crafted than simplify stacking async fn calls. I've read through actix-web and actix-net repo, most of the code is hand crafted future seen in pre-async-await future 1.0 era, efficient but hard to mantaint, a refactor will take a lots of work

snaggen commented 2 years ago

For now you can use a channel to write async function style stream handling as alternative.

@fakeshadow but the solution you suggest will run in to the issue that payload doesn't implement Send, and hence is only safe for single threaded runtimes, however the multi-threaded runtime is standard on most systems. See, https://rust-lang.github.io/rust-clippy/master/index.html#future_not_send

As is now, a payload cannot safely be handled in the background unless you use single threaded runtimes, from what I can see.

EDIT: Ignore this, I see that actix_web::rt::spawn uses spawn_local under the hood.

robjtede commented 2 years ago

We're experimenting in the lab with some APIs to make streaming responses easier and off-thread compatible.

LinusCDE commented 1 year ago

Seems my issue (#2774) is the exact case study for this.

Also tried the channel thing from the example. The stream is no longer available in actix_utils. Also tried the Sender and Channel body types from actix-web-lab here, but those seem to wait for the entire response before actually sending it, which I not what I'm searching for.