gotham-rs / gotham

A flexible web framework that promotes stability, safety, security and speed.
https://gotham.rs
Other
2.23k stars 125 forks source link

Streaming response bodies #189

Open kbillings opened 6 years ago

kbillings commented 6 years ago

Is it possible to stream a response?

I tried using Body::pair() but I was not able to get it working.

smangelsdorf commented 6 years ago

This is definitely something I'd like to see supported, but is not something that has received any attention yet. When you used Body::pair(), did you see a type error, or something else?

kbillings commented 6 years ago

Well I first tried this:

let mut response = Response::new();
set_headers(&state, &mut response, Some(::mime::TEXT_PLAIN), None);
let (sender, body) = Body::pair();

let stream = stream::iter_ok(vec![
    Chunk::from("1\n"),
    Chunk::from("2\n"),
    Chunk::from("3\n")
].into_iter()).map(Ok);

response.set_body(body);

Box::new(
    sender.send_all(stream)
        .map_err(|e| e.into_handler_error())
        .then(|x| match x {
            Ok(_) => Ok((state, response)),
            Err(e) => Err((state, e)),
        })
)

But it just waits forever not returning anything. Then I tried this:

let mut response = Response::new();
set_headers(&state, &mut response, Some(::mime::TEXT_PLAIN), None);
let (sender, body) = Body::pair();

thread::spawn(|| {
    let stream = stream::iter_ok(vec![
        Chunk::from("1\n"),
        Chunk::from("2\n"),
        Chunk::from("3\n")
    ].into_iter()).map(Ok);

    sender.send_all(stream).wait().unwrap()
});

response.set_body(body);

But that just returns immediately with no body.

Gotham currently forces the body of a response to be a hyper::Body, if that was lifted you could set the body to any Stream<Item = Chunk> and I think it would work.

smangelsdorf commented 6 years ago

Thanks. I'll have a look into this.

millardjn commented 6 years ago

I think the problem in @kbillings's example is due to the ContentLength header is being set to 0 by set_headers(&state, &mut response, Some(::mime::TEXT_PLAIN), None);, preventing continued streaming of the response.

I just got streaming working with:

let mut res = Response::new();
set_headers(&state, &mut res, Some(ext_to_mime(&ext)), None);
res.headers_mut().set(CacheControl(vec![
            CacheDirective::MaxAge(86400u32),
            CacheDirective::Public,
        ]));
res.headers_mut().remove::<ContentLength>();

let (sender, body) = Body::pair();
res.set_body(body);

let stream = FS_POOL.read(pathbuf)
    .map(|bytes| Ok(bytes.into()));

let sender = sender
    .sink_map_err(|e| hyper::Error::from(::std::io::Error::new(::std::io::ErrorKind::Other, e)));

let streaming_future = sender.send_all(stream)
    .map(|(_sink, _stream)| ())
    .map_err(|_e| error!("Streaming error"));

// Pass streaming future to tokio
// TODO handle at_capacity error
DefaultExecutor::current().spawn(Box::new(streaming_future)).unwrap();

Box::new(future::ok((state, res)))

where FS_POOL is a futures_fs::FsPool returning stream from a file.

smangelsdorf commented 6 years ago

Thanks @millardjn … That would explain what I saw when I looked at this briefly. Does this fix it for you @kbillings?

ChristophWurst commented 6 years ago

Thanks for this example code, @millardjn! Did you test it with larger files? I just wrote an integration of the futures-fs crate for gotham and while streaming to responses works for small text files, it gets stuck with larger files after the FsReadStream delivers two chunks.

I've also tried a different approach where I used a hyper::Response<Box<Stream<Item = Chunk, Error = hyper::Error>>> which works from a Hyper perspective, but Gotham assumes handler futures to resolve to a Response<Body> it seems and therefore I got compilation errors.

You can find my PoC here: https://github.com/ChristophWurst/gotham-middleware-fs/pull/1/files

millardjn commented 6 years ago

Mostly image files up to 10-20 MB, nothing in the GB/TB range, but enough that I should be way past 2 chunks. All I can think of is that the future from the stream-sink interaction has to be handed to a tokio executor or a new thread so it keeps getting polled independently of the response future.

Good idea with the middleware. If you want to share the stalled code in a gist I'll take a look.

ChristophWurst commented 6 years ago

Mostly image files up to 10-20 MB, nothing in the GB/TB range, but enough that I should be way past 2 chunks.

My test file was a ~80 MB one. According to the printf statement I added for debugging purposes two chunks of 8192 bytes are read before it stalls.

All I can think of is that the future from the stream-sink interaction has to be handed to a tokio executor or a new thread so it keeps getting polled.

I'm returning the future which resolves when the read stream has been forwarded to the body stream. So I assumed that using this future to build the request handler future will eventually be passed to the event loop by Gotham and therefore will be polled.

If you want to share the stalled code in a gist I'll take a look.

See https://github.com/ChristophWurst/gotham-middleware-fs/pull/1/files and the newly added response_stream example. Replace Cargo.toml with a large file on your disk and start it with cargo run --example response_stream.

kpcyrd commented 5 years ago

I ran into this issue as well and it it's difficult to get the snippets to work without a Cargo.toml and imports, can somebody add this to the examples/ folder? :)