Nullus157 / async-compression

Adaptors between compression crates and Rust's async IO types
https://docs.rs/async-compression
Apache License 2.0
403 stars 79 forks source link

`ZstdEncoder` should not panic if `tokio::io::AsyncWrite::flush` is called after `tokio::io::AsyncWrite::shutdown` #246

Closed aatifsyed closed 9 months ago

aatifsyed commented 1 year ago

I'm hitting a Flush after shutdown panic, ultimately here: https://github.com/Nullus157/async-compression/blob/ece5584ce59a683c38e94f9957969e1bdb08b665/src/tokio/write/generic/encoder.rs#L97

Repro

Here's an minimal example, which reliably panics on my machine.

use async_compression::tokio::write::ZstdEncoder; // 0.4.3
use bytes::Bytes; // 1.5.0
use futures::{stream, StreamExt as _}; // 0.3.28
use std::io; // 1.72.0
use tokio::fs::File; // 1.32.0
use tokio_util::codec::{BytesCodec, FramedWrite}; // 0.7.8

async fn _main() -> io::Result<()> {
    let file = File::create("/dev/null").await?;
    let zstd_encoder = ZstdEncoder::new(file);
    let bytes_sink = FramedWrite::new(zstd_encoder, BytesCodec::new());
    stream::empty::<io::Result<Bytes>>()
        .forward(bytes_sink)
        .await
}

#[tokio::main]
async fn main() -> io::Result<()> {
    _main().await
}

Explanation

Here's the sequence of events:

  ┌───────────────────────────────────┐ ┌──────────────────────────────────┬────────────────────────────────┐ 
  │ file::poll_flush -> Ready(Ok(())) │ │ file::poll_write -> Ready(Ok(9)) │ file::poll_shutdown -> Pending │ 
 ┌┴───────────────────────────────────┴┬┴──────────────────────────────────┴────────────────────────────────┴┐ ┌─────────────BOOM
 │  zstd::poll_flush -> Ready(Ok(()))  │                    zstd::poll_shutdown -> Pending                   │ │ zstd::poll_flush
┌┴─────────────────────────────────────┴─────────────────────────────────────────────────────────────────────┴┬┴─────────────BOOM
│                                      sink::poll_close -> Pending                                            │  sink::poll_close
└─────────────────────────────────────────────────────────────────────────────────────────────────────────────┴──────────────BOOM

Discussion

The bug is basically an interaction between the above code, and this code in tokio_util:

    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        ready!(self.as_mut().poll_flush(cx))?;
        ready!(self.project().inner.poll_shutdown(cx))?;

        Poll::Ready(Ok(()))
    }

on docs.rs / on github

A fix could be either:

  1. tokio_util::codec keeps track of whether it has flushed the inner reader in poll_close (not flushing it twice)
  2. Remove the panic in async_compression, or enhance its state machine to address the above.

I think the right fix is (2):

The documentation for AsyncWrite doesn't say that you're not allowed to flush after calling shutdown, in fact, I think implementors should be prepared to handle such a case, at least until shutdown returns Poll::Ready.

Take the following from the docs:

Invocation of a shutdown implies an invocation of flush. Once this method returns Ready it implies that a flush successfully happened before the shutdown happened. That is, callers don’t need to call flush before calling shutdown. They can rely that by calling shutdown any pending buffered data will be written out.

So following the API, I could write a simple Transparent<T: AsyncWrite> wrapper:

```rust pin_project! { struct Transparent { #[pin] inner: T }} impl AsyncWrite for Transparent where T: AsyncWrite, { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { self.project().inner.poll_write(cx, buf) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().inner.poll_flush(cx) } fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); ready!(this.inner.as_mut().poll_flush(cx))?; this.inner.poll_shutdown(cx) } } impl Transparent { fn new(inner: T) -> Self { Self { inner } } } ```

which, of course, panics if it contains a ZstdEncoder.

In fact, with the appropriate interleaving of Poll::Pending, ZstdEncoder::new(ZstdEncoder::new(...)) will panic.

I'm pretty sure this affects all tokio codecs in this crate.

robjtede commented 9 months ago

should be fixed by #255 pending release tomorrow

aatifsyed commented 9 months ago

I think this is still a logic bug - the encoder should not return an error in the given case.

Would you prefer I reword the error issue or open a new one?

robjtede commented 9 months ago

Okay. I'd just tested your repro of the panic and saw that it didn't on master.

New issue would be great please.

aatifsyed commented 1 week ago

New issue would be great please.

308