Majored / rs-async-zip

An asynchronous ZIP archive reading/writing crate.
MIT License
123 stars 40 forks source link

[Question][Offtopic] Async Zip Compat #93

Closed arifd closed 1 year ago

arifd commented 1 year ago

Hi Harry!

This crate is absolutely wonderful, it saved me over a year ago when I was too dumb to figure out async_compression, and your package (back at 0.0.7) wrapped it and did all the dirty work for me.

Now I just bumped the package to 0.0.15, to take advantage of some of the compression algorithms being behind feature flags, and improve compile times, but I noticed a couple things:

  1. it no longer wraps async_compression. Why not?
  2. It has introduced this Compat thing. Which I did not need before.

I was hoping you wouldn't explaining this Compat situation and why it is now needed. In the process I hope to gain a better understanding of the rust async ecosystem! Thank you!!

For reference, here is my function that uses your library, and the comments IN CAPS detail the changes i had to make to bump to 0.0.15


#[get("/download")]
pub async fn directories_download(
    db_pool: web::Data<DbPool>,
    blob_storage: web::Data<BlobStorage>,
    query: web::Query<DownloadDirectoryRequest>,
    id: Identity,
) -> StreamingResponse<ReaderStream<impl AsyncRead>> {
    tracing::Span::current().record("query", query.as_value());
    let user_id =
        require_user_login(id).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;

    // Prepare a stream that will receive the compressed bytes
    let (mut compressed_tx, compressed_rx) = tokio::io::duplex(1024);
    // I NEEDED TO `.compat()` THIS
    let compressed_tx = compressed_tx.compat();

    // Get a list of hashes and paths that we need to compress
    let files_to_zip = {
        let mut files = directories_get_children_hashes_and_paths_for_directory_download(
            db!(db_pool),
            user_id,
            query.directory_entry_id,
        )
        .await?;

        if query.deduplicate == Some(true) {
            deduplicate(&mut files)
        };

        files
    };

    tokio::spawn(async move {
        // Prepare our ZipFileWriter
        let mut zip_archive = ZipFileWriter::new(compressed_tx);

        for (hash, path) in files_to_zip {
            let (mut uncompressed_tx, mut uncompressed_rx) = tokio::io::duplex(1024);

            let mut entry_writer = zip_archive
                .write_entry_stream(ZipEntryBuilder::new(path.into(), Compression::Deflate))
                .await
                .expect("Couldn't create an EntryStreamWriter")
                // NEEDED `.compat_write()` HERE
                .compat_write();

            // Begin streaming into the channel
            let blob_storage = blob_storage.clone();
            tokio::spawn(async move {
                blob_storage
                    .retrieve_file_streaming(&hash, &mut uncompressed_tx)
                    .await
                    .expect("blob storage could not retrieve file");
            });

            // Copy from channel into the entry_writer
            tokio::io::copy(&mut uncompressed_rx, &mut entry_writer)
                .await
                .expect("couldn't copy unompressed bytes into the EntryStreamWriter");

            // // finalize this file's compression
            entry_writer
                // NEEDED TO GET THE `EntryStreamWriter` BACK OUT
                // TO BE ABLE TO `.close()` IT
                .into_inner()
                .close()
                .await
                .expect("couldn't shutdown the EntryStreamWriter");
        }

        // When all uncompressed_streams have completed we can close off
        // the ZipFileWriter
        zip_archive
            .close()
            .await
            .expect("couldn't close the zip file");
    });

    Ok(StreamingBody(ReaderStream::new(compressed_rx)))
}
`
NobodyXu commented 1 year ago

compat is there so that this crate can support not just tokio, but also async-std and potentially even more runtime.

This is a workaround until portable runtime is standardrised

Majored commented 1 year ago

This crate is absolutely wonderful, it saved me over a year ago when I was too dumb to figure out async_compression, and your package (back at 0.0.7) wrapped it and did all the dirty work for me.

Glad we could help. 😄

it no longer wraps async_compression. Why not?

Could you clarify what you mean exactly? I can't remember anything having drastically changed in that area.

It has introduced this Compat thing. Which I did not need before.

You can use the with_tokio() constructor to avoid having to manually call .compact(). You can find more detail about this in the module-level docs.

I was hoping you wouldn't explaining this Compat situation and why it is now needed. In the process I hope to gain a better understanding of the rust async ecosystem! Thank you!!

As alluded to above, the base implementation no longer depends on tokio in order to allow support for other runtimes. At the moment, Rust doesn't allow async functions in traits which means the standard library doesn't provide async versions of the IO traits. So these runtimes have to define their own which is where compat comes in to convert between the different traits.

Once there are stabilised async IO traits within the standard library, I'd expect almost everyone to start depending on them and then we can switch the base implementation over (and then only have tokio for tokio-specfic things like the fs reader).

arifd commented 1 year ago

Aha! Perfect explanation. Thank you! Makes a lot of sense.

If you look at the dependencies for 0.0.7 you'll see async_compression there image

(I just assumed async_zip was a thin wrapper)

If you look at the deps for 0.0.15, async_compression has become optional

ahh.. but now I look at your feature list again, it makes sense, it was made optional so that users of async_zip can pick and choose what to compile! Which is great because it's exactly why I bumped to 0.0.15! :D ... It still essentially depends on async_compression it as much as it did.