actix / actix-web

Actix Web is a powerful, pragmatic, and extremely fast web framework for Rust.
https://actix.rs
Apache License 2.0
21.6k stars 1.67k forks source link

Unable to produce chunked response progressively #1066

Closed wangjia184 closed 5 years ago

wangjia184 commented 5 years ago

Rust Version : 1.39 nightly actix-web version : 1.0.7

The following code does not produce any response and HTTP connection hangs although unbounded_send() returns a success and Success is printed.

It seems actix-web cannot send out chunked response progressively before the connection completes in actix::spawn.

main.rs

use futures01::stream::Stream;
use futures01::sync::mpsc;
use futures::future::{FutureExt, TryFutureExt};

use actix_web::*;
use bytes::Bytes;
use std::{thread, time};

fn index(req: HttpRequest) -> impl Responder {
    let (tx, rx_body) = mpsc::unbounded();

    let text = "Something!\n";

    let fu = async move {

        loop {
            // we will await here, for now send a hardcode string to test
            let result = tx.unbounded_send(Bytes::from(text.as_bytes()));
            if let Err(e) = result {
                println!("{}", e);
                return;
            } else {
                println!("Success");
            }
            let ten_millis = time::Duration::from_millis(100);
            thread::sleep(ten_millis);
        }
    };
    actix::spawn(fu.unit_error().boxed_local().compat());

    HttpResponse::Ok().streaming(rx_body.map_err(|_| error::ErrorInternalServerError("Internal Server Error")))
}

pub fn main(){
    let _ = HttpServer::new(|| {
        App::new()
            .wrap(middleware::DefaultHeaders::new().header("Cache-control", "no-cache"))
            .wrap(middleware::Compress::default())
            .wrap(middleware::Logger::default())
            .service(
                web::resource("/").route(web::get().to(index)),
            )
    })
    .bind("0.0.0.0:8080")
    .expect("Unable to bind port 8080")
    .run()
    .expect("Failed to start HTTP server");
}

cargo.toml

[dependencies]
actix = "0.8.3"
bytes = "0.4.12"
futures01 = { package = "futures", version = "0.1", optional = false }

[dependencies.actix-web]
version = "1.0.7"
features = ["ssl"]

[dependencies.futures-preview]
version = "0.3.0-alpha.18"
default-features = false
features = ["compat", "async-await", "nightly"]
wangjia184 commented 5 years ago

Funny thing is, changing from actix::spawn to thread::spawn would make it work!

fn index(req: HttpRequest) -> impl Responder {
    let (tx, rx_body) = mpsc::unbounded();

    let text = "Something!\n";

    thread::spawn(move || loop {
        let _ = tx.unbounded_send(Bytes::from(text.as_bytes()));
        let ten_millis = time::Duration::from_millis(100);
        thread::sleep(ten_millis);
    });

    HttpResponse::Ok().streaming(rx_body.map_err(|_| error::ErrorInternalServerError("Internal Server Error")))
}

What is the secret behind actix::spawn? why does thread::spawn work instead?

fafhrd91 commented 5 years ago

Remove thread::sleep

wangjia184 commented 5 years ago

@fafhrd91 tried that. Removing thread::sleep does not make any difference. unbounded_send() is being called many times and there is no response at all.

fafhrd91 commented 5 years ago

Because your code blocks and never yields

I guess, this line needs to be let result = tx.unbounded_send(Bytes::from(text.as_bytes())).await; or something like this

wangjia184 commented 5 years ago

Thanks @fafhrd91

The return value of unbounded_send() is type of Result<(), SendError<T>> which cannot await.

However, I take your advice and await on a non-blocking timer, it works!

    let fu = async move {

        loop {
            // we will await here, for now send a hardcode string to test
            let result = tx.unbounded_send(Bytes::from(text.as_bytes()));
            if let Err(e) = result {
                println!("{}", e);
                return;
            } else {
                println!("Success");
            }
            futures_timer::Delay::new(std::time::Duration::from_secs(1)).await;
        }
    };
    actix::spawn(fu.unit_error().boxed_local().compat());
wangjia184 commented 5 years ago

I found there is another key point. App::new().wrap(middleware::Compress::default()) has to be removed to make it work.

If compression middleware is enabled, progressive response is impossible. Is this a bug?

fafhrd91 commented 5 years ago

It is not bug. Compression works

tomocrafter commented 4 years ago

I copied your source code but it doesn't work. How did you solve this problem? It says "Success" but never receive any message.

wangjia184 commented 4 years ago

@tomocrafter "Success" means the channel is closed. you need keep the channel open and send data to it.

tomocrafter commented 4 years ago

@wangjia184 Thank you for quick reply. It continues to print with Success regularly. but When I close the connection of http, and then the streaming immediately stops and prints "send failed because receiver is gone". I don't think channel is closed.