PSeitz / lz4_flex

Fastest pure Rust implementation of LZ4 compression/decompression.
MIT License
462 stars 31 forks source link

AsyncWrite support #15

Open Alexei-Kornienko opened 3 years ago

Alexei-Kornienko commented 3 years ago

Would be good to add impl of FrameEncoder/FrameDecoder where W: AsyncWrite (futures::io::AsyncWrite), R: AsyncRead

This would enable user the possibility to use compression in async data streams

PSeitz commented 3 years ago

Thanks for the issue, yeah I also thought about that. It would be nice to have, but I'm not sure how good it can be managed to have both in terms of code duplication.

Alexei-Kornienko commented 3 years ago

One solution that I could propose is: 1) Refactor implementation to always work with AsyncWrite trait 2) Add a thin Wrapper that would take any type that implement Write and impl AsyncWrite for it without returning Poll::Pending

In result Async version would be used as is. Sync version could internally use block_on (https://docs.rs/futures/0.3.15/futures/executor/fn.block_on.html) to convert sync call to async call

Pros:

Cons:

arthurprs commented 3 years ago

One significant problem right now is that the ecosystem is split regarding AsyncRead/Write. Every runtime has an its own (tokio, ..) set of traits and the futures crates have another one.

PSeitz commented 1 year ago

A good idea may be to add lz4_flex support to https://github.com/Nemo157/async-compression

cpick commented 2 months ago

The guts of my ugly workaround on the AsyncRead side is:

const CHUNK_LENGTH: usize =  /* 1 arbitrary * */ 1024 /* KiB */ * 1024 /* MiB */ ;
let compressed_chunk_max_length = lz4_flex::block::get_maximum_output_size(CHUNK_LENGTH);

let mut reader = lz4_flex::frame::FrameDecoder::new(ChunkReader::new());

loop {
    while reader.get_ref().len() < compressed_chunk_max_length {
        let Some(received) = stream.recv_data().await.context("receive data")? else {
            break;
        };
        reader.get_mut().append(received);
    }

    let read_length = {
        // FIXME: ideally wouldn't initialize
        let buffer = if buffer.get_ref().remaining() > CHUNK_LENGTH {
            buffer.get_mut().initialize_unfilled_to(CHUNK_LENGTH)
        } else {
            buffer.get_mut().initialize_unfilled()
        };

        reader.read(buffer).context("read chunk")?
    };
    buffer.get_mut().advance(read_length);

    // break at end of stream
    if read_length == 0 {
        break;
    }
}

Where buffer: &mut buf::Writer<ReadBuf<'_>>, stream is a h3::client::RequestStream, and ChunkReader is a custom type that implements bytes::Buf and is based on an inner VecDeque<buf::Reader<B>>.

This attempts to read chunks off the connection asynchronously and decompress them piecemeal without ever blocking.

Likewise on the AsyncWrite side:

let mut writer = lz4_flex::frame::FrameEncoder::new(vec![].writer());

for chunk in buffer.chunks(
    /* 1 arbitrary * */ 1024 /* KiB */ * 1024, /* MiB */
) {
    writer.write_all(chunk).context("encoder write all")?;
    let buffer = mem::take(writer.get_mut().get_mut());
    stream.send_data(buffer.into()).await.context("send data")?;
}

let buffer = writer.finish().context("encoder finish")?.into_inner();
stream
    .send_data(buffer.into())
    .await
    .context("send finished data")?;

Where buffer: &[u8] and stream is a h3::client::RequestStream.

This attempts to periodically steal the compressed data from the encoder and asynchronously write it to the connection.

I don't really like how these turned out or think they're a clean solution, but wanted to share what has been working for me in case it helps anyone else.

link2xt commented 1 month ago

Thanks for the issue, yeah I also thought about that. It would be nice to have, but I'm not sure how good it can be managed to have both in terms of code duplication.

There should be some API that does not depend on std::io::Read, tokio::io::AsyncRead or futures::io::AsyncRead. This is usually referred to as sans I/O API.

Then everyone can implement their own Read, Write, AsyncRead and AsyncWrite traits on top of this as a thin layer, e.g. async-compression can provide async implementations for futures and tokio.

I looked into the code a bit but it seems that a lot of logic is currently inside the lz4_flex::frame::FrameDecoder and lz4_flex::frame::FrameEncoder. This should be factored out before attempting to implement async support, otherwise async-compression crate will end up copying a lot of code from this crate.

Good example is flate2 crate, it has Compress and Decompress structures that get input and output buffers, but don't manage the buffering and I/O themselves.

brandonros commented 3 weeks ago

https://docs.rs/flate2/1.0.34/src/flate2/deflate/bufread.rs.html#116-118

flate2 encoders for sure just use std::io::Write and not this cool sans I/O thing you are talking about, right?

link2xt commented 3 weeks ago

flate2 encoders for sure just use std::io::Write and not this cool sans I/O thing you are talking about, right?

async-compression does not use std::io::Write, but the Compress API instead: https://github.com/Nullus157/async-compression/blob/f0a5c7420de645bbb76898866c11cb5a50ecf89d/src/codec/flate/encoder.rs#L33-L35

Same for the sync compressor you link to, it also uses sans-io Compress API internally.

Once you have something like compress function that operates on memory buffer, you can implement both std::io::Write and AsyncWrite on top of it. But lz4_flex implements compressor directly in std::io::Write implementation, so writing AsyncWrite is not possible without copy-pasting the code from sync implementation or wrapping sync implementation by running it in a separate thread (this is what e.g. tokio::fs module does to wrap std::fs but that's hacky and means underlying file is owned by a thread or you need to add synchronization on top).