actix / examples

Community showcase and examples of Actix Web ecosystem usage.
Apache License 2.0
3.7k stars 807 forks source link

Streaming responses (from an AsyncWrite) #570

Open Nerixyz opened 2 years ago

Nerixyz commented 2 years ago

I think these might be two issues in one:

  1. There's no example (except maybe the SSE one) that has streaming responses. Which is probably because it's hard to come up with a good example for streaming responses.
  2. I haven't found good documentation on how to "pipe" an AsyncWrite into a Stream<Item=Bytes(Mut)> (maybe something for actix-web-lab).

Note: Also see discussion on actix-web Discord: https://discord.com/channels/771444961383153695/771447545154371646/1009110232473014383

The proposed example is one that streams files from a directory (files) as a zip file, i.e. it dynamically creates the zip file.

For this I'm using async_zip which exposes an API that requires the user to pass in an AsyncWrite (ZipFileWriter::new). To "pipe" the AsyncWrite to a Stream, I'm using a DuplexStream and the BytesCodec.

main.rs ```rust use actix_web::{get, http, App, HttpResponse, HttpServer, Responder}; use async_zip::write::{EntryOptions, ZipFileWriter}; use futures::stream::TryStreamExt; use std::io; use tokio::io::AsyncWrite; use tokio_util::codec; #[get("/")] async fn index() -> impl Responder { let (to_write, to_read) = tokio::io::duplex(2048); tokio::spawn(async move { let mut zipper = async_zip::write::ZipFileWriter::new(to_write); if let Err(e) = read_dir(&mut zipper).await { // TODO: do something eprintln!("Failed to write files from directory to zip: {e}") } if let Err(e) = zipper.close().await { // TODO: do something eprintln!("Failed to close zipper: {e}") } }); let stream = codec::FramedRead::new(to_read, codec::BytesCodec::new()).map_ok(|b| b.freeze()); HttpResponse::Ok() .append_header(( http::header::CONTENT_DISPOSITION, r#"attachment; filename="folder.zip""#, )) // not sure if this is really necessary, // but we're already sending compressed data, // so make sure other middleware won't compress this again .append_header((http::header::CONTENT_ENCODING, "identity")) .streaming(stream) } async fn read_dir(zipper: &mut ZipFileWriter) -> Result<(), io::Error> where W: AsyncWrite + Unpin, { let mut dir = tokio::fs::read_dir("files").await?; while let Ok(Some(entry)) = dir.next_entry().await { if !entry.metadata().await.map(|m| m.is_file()).unwrap_or(false) { continue; } let mut file = match tokio::fs::OpenOptions::new() .read(true) .open(entry.path()) .await { Ok(f) => f, Err(_) => continue, // we can't read the file }; let filename = match entry.file_name().into_string() { Ok(s) => s, Err(_) => continue, // the file has a non UTF-8 name }; let mut entry = zipper .write_entry_stream(EntryOptions::new(filename, async_zip::Compression::Deflate)) .await .map_err(zip_to_io_err)?; tokio::io::copy(&mut file, &mut entry).await?; entry.close().await.map_err(zip_to_io_err)?; } Ok(()) } fn zip_to_io_err(e: async_zip::error::ZipError) -> io::Error { io::Error::new(io::ErrorKind::Other, e) } #[actix_web::main] async fn main() -> io::Result<()> { HttpServer::new(move || App::new().service(index)) .bind(("127.0.0.1", 8080))? .run() .await } ```

As I explained on Discord, using a DuplexStream is probably overkill, since it's supposed to be used bi-directional (see example in tokio docs), so I tried to extract the internal Pipe used by the tokio implementation and made a pipe specifically for (buffered) piping of AsyncWrite to a Stream<BytesMut>. I'm not sure if this should be included in actix-web-lab as a utility when dealing with AsyncWrite (or maybe in some other crate?).

async_pipe.rs ```rust use std::{sync::{Arc, Mutex, MutexGuard}, task::{self,Waker, Poll}, pin::Pin}; use bytes::{BytesMut, Buf}; use futures::Stream; use tokio::io::AsyncWrite; /// The `AsyncWrite` half of an [`async_pipe`] pub struct AsyncPipeWriter(Arc>); /// The `Stream` half of an [`async_pipe`] pub struct AsyncPipeReader(Arc>); /// Creates buffered pipe that pipes writes from an `AsyncWrite` to a `Stream`. /// /// `max_buf_size` is the maximum amount of bytes that can be written to the pipe's internal buffer /// before further writes return `Poll::Pending`. pub fn async_pipe(max_buf_size: usize) -> (AsyncPipeWriter, AsyncPipeReader) { let pipe = Arc::new(Mutex::new(Pipe::new(max_buf_size))); (AsyncPipeWriter(pipe.clone()), AsyncPipeReader(pipe)) } /// A unidirectional IO over a piece of memory. /// /// Data can be written to the pipe, and reading will return that data. /// /// [tokio's](https://github.com/tokio-rs/tokio/blob/de81985762a242c77361a6ab9de198372ca85987/tokio/src/io/util/mem.rs#L54-L76) /// internal representation of a pipe for a `tokio::io::DuplexStream`. #[derive(Debug)] struct Pipe { /// The buffer storing the bytes written, also read from. /// /// Using a `BytesMut` because it has efficient `Buf` and `BufMut` /// functionality already. buffer: BytesMut, /// Determines if the write side has been closed. is_closed: bool, /// The maximum amount of bytes that can be written before returning /// `Poll::Pending`. max_buf_size: usize, /// If the `read` side has been polled and is pending, this is the waker /// for that parked task. read_waker: Option, /// If the `write` side has filled the `max_buf_size` and returned /// `Poll::Pending`, this is the waker for that parked task. write_waker: Option, } impl Pipe { fn new(max_buf_size: usize) -> Self { Pipe { buffer: BytesMut::new(), is_closed: false, max_buf_size, read_waker: None, write_waker: None, } } fn close_write(&mut self) { self.is_closed = true; // needs to notify any readers that no more data will come if let Some(waker) = self.read_waker.take() { waker.wake(); } } fn close_read(&mut self) { self.is_closed = true; // needs to notify any writers that they have to abort if let Some(waker) = self.write_waker.take() { waker.wake(); } } fn poll_read_internal( mut self: Pin<&mut Self>, cx: &mut task::Context<'_> ) -> Poll> { if self.buffer.has_remaining() { let bytes = std::mem::take(&mut self.buffer); if let Some(waker) = self.write_waker.take() { waker.wake(); } Poll::Ready(Some(bytes)) } else if self.is_closed { Poll::Ready(None) } else { self.read_waker = Some(cx.waker().clone()); Poll::Pending } } fn poll_write_internal( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8], ) -> Poll> { if self.is_closed { return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into())); } let avail = self.max_buf_size - self.buffer.len(); if avail == 0 { self.write_waker = Some(cx.waker().clone()); return Poll::Pending; } let len = buf.len().min(avail); self.buffer.extend_from_slice(&buf[..len]); if let Some(waker) = self.read_waker.take() { waker.wake(); } Poll::Ready(Ok(len)) } } impl AsyncWrite for Pipe { fn poll_write( self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8], ) -> Poll> { self.poll_write_internal(cx, buf) } fn poll_flush(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll> { Poll::Ready(Ok(())) } fn poll_shutdown( mut self: Pin<&mut Self>, _: &mut task::Context<'_>, ) -> Poll> { self.close_write(); Poll::Ready(Ok(())) } } impl AsyncWrite for AsyncPipeWriter { fn poll_write( self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8], ) -> Poll> { Pin::new(&mut *always_lock(&self.0)).poll_write(cx, buf) } fn poll_flush( self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll> { Pin::new(&mut *always_lock(&self.0)).poll_flush(cx) } fn poll_shutdown( self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll> { Pin::new(&mut *always_lock(&self.0)).poll_shutdown(cx) } } impl Stream for Pipe { type Item = BytesMut; fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { self.poll_read_internal(cx) } fn size_hint(&self) -> (usize, Option) { let remaining = self.buffer.remaining(); (remaining, Some(remaining)) } } impl Stream for AsyncPipeReader { type Item = BytesMut; fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { Pin::new(&mut *always_lock(&self.0)).poll_next(cx) } fn size_hint(&self) -> (usize, Option) { always_lock(&self.0).size_hint() } } impl Drop for AsyncPipeWriter { fn drop(&mut self) { // notify the other side of the closure always_lock(&self.0).close_write(); } } impl Drop for AsyncPipeReader { fn drop(&mut self) { // notify the other side of the closure always_lock(&self.0).close_read(); } } #[inline] fn always_lock(mtx: &Mutex) -> MutexGuard { match mtx.lock() { Ok(g) => g, Err(e) => e.into_inner(), } } ```
index handler with async_pipe ```rust #[get("/")] async fn index() -> impl Responder { let (to_write, stream) = async_pipe(2048); tokio::spawn(async move { let mut zipper = async_zip::write::ZipFileWriter::new(to_write); if let Err(e) = read_dir(&mut zipper).await { // TODO: do something eprintln!("Failed to write files from directory to zip: {e}") } if let Err(e) = zipper.close().await { // TODO: do something eprintln!("Failed to close zipper: {e}") } }); HttpResponse::Ok() .append_header(( http::header::CONTENT_DISPOSITION, r#"attachment; filename="folder.zip""#, )) // not sure if this is really necessary, // but we're already sending compressed data, // so make sure other middleware won't compress this again .append_header((http::header::CONTENT_ENCODING, "identity")) .streaming(stream.map(|b| Ok::<_, io::Error>(b.freeze()))) } ```
cargo.toml ```toml [package] name = "actix-zippy" version = "0.1.0" edition = "2021" [dependencies] actix-web = "4.1.0" async_zip = "0.0.8" bytes = "1.2.1" futures = "0.3.23" tokio = { version = "1.20.1", features = ["io-util", "fs"] } tokio-stream = "0.1.9" tokio-util = { version = "0.7.3", features = ["codec"] } ```
pashinin commented 2 years ago

Where does stream.map(...) come from?

44 |         .streaming(stream.map(|b| Ok::<_, io::Error>(b.freeze())))
   |                           ^^^ `AsyncPipeReader` is not an iterator
   |
  ::: async_pipe.rs:9:1
   |
9  | pub struct AsyncPipeReader(Arc<Mutex<Pipe>>);
   | --------------------------------------------- doesn't satisfy `AsyncPipeReader: Iterator`
Nerixyz commented 2 years ago

Where does stream.map(...) come from?

It's from futures::stream::StreamExt.

robjtede commented 2 years ago

maybe something for actix-web-lab

had a play with it and come up with something similar to the body::channel ideas: