Majored / rs-async-zip

An asynchronous ZIP archive reading/writing crate.
MIT License
135 stars 44 forks source link

Async ZIP Streaming?... #118

Closed inzanez closed 8 months ago

inzanez commented 10 months ago

Hi

I was wondering if it was possible to create ZIP files on the fly so that one could stream them through an HTTP connection, for instance. But reading the issues and the docs I am not sure if that's supported at all...

Majored commented 8 months ago

Sorry for the delayed response here. This would definitely be possible - ZipFileWriter is generic over AsyncWrite. So you'd probably want to get a TcpStream and then use the with_tokio() method.

inzanez commented 8 months ago

I'm trying to use a tokio DuplexStream for the writer. When trying to add files to the ZipFileWriter, I currently always get a UpstreamReadError(Kind(BrokenPipe)). Even if the data used is not coming from a file but static:

                let ze = ZipEntryBuilder::new(ZipString::from("test"), Compression::Deflate).build();
                let data = b"This is an example file.";
                w.write_entry_whole(ze, data).await.expect("Failed to write entry");
inzanez commented 8 months ago

And it seems with a custom writer implementation it stops writing to the zip file exactly after 32 entries,.

inzanez commented 8 months ago

Ok, maybe that helps somebody else too, here is some sample code for streaming ZIP receiving and unpacking as well as streaming ZIP generation with the Rocket web framework:

Receiving a ZIP file:

#[post(
"/zip",
format = "plain",
data = "<data>"
)]
async fn zip_receive(
    data: Data<'_>,
)  {
    let ds = data.open(1.tibibytes());
    let mut reader = ZipFileReader::with_tokio(ds);

    let mut buf = [0;65536];

    loop {
        if let Some(mut x) =  reader.next_with_entry().await.expect("Failed to open reader") {

            let e = x.reader().entry();
            println!("{}", e.filename().as_str().expect("Failed to extract zip files"));

            // Actual extraction would need to happen here,...
            loop {
                let read = x.reader_mut().read(&mut buf).await.expect("Failed to read from entry");

                if read == 0 {
                    break;
                }
            }
            reader = x.done().await.expect("Failed to finish entry");
        } else {
            break;
        }

    }

    ()
}

Generation a ZIP file:

#[get("/zip")]
async fn zip_stream() -> ByteStream![Vec<u8>]   {
    let (sender, mut receiver) = duplex(65536);
    let base_path = std::path::Path::new("/some/dir/to/zip");
    let t = tokio::task::spawn( async move {
        let mut w = ZipFileWriter::with_tokio(sender).force_zip64();

        for e in WalkDir::new(&base_path) {
            let e = e.unwrap();

            if e.metadata().unwrap().is_file() {
                let p = e.path().strip_prefix(base_path.parent().unwrap()).unwrap().to_str().unwrap();
                let ze = ZipEntryBuilder::new(ZipString::from(p), Compression::Stored).build();
                let data = tokio::fs::read(e.path()).await.expect("Could not read file");
                w.write_entry_whole(ze.clone(), &data).await.expect("Could not write entry");
            }
        }

        w.close().await.expect("Failed to close writer");
    });

    let mut buf = vec![0; 65536];

    ByteStream! {
        loop {
            let read = receiver.read(&mut buf).await.expect("F");
            if read > 0 {
                yield buf[0..read].to_vec();
            } else {
                if t.is_finished() {
                break;
                }
            }
        }
    }
}
Wamy-Dev commented 7 months ago

Would this support folders too?

inzanez commented 7 months ago

@Wamy-Dev yes, as it walks the directory tree that would work. If you really want to write the ZIP in streaming fashion you should not use write_entry_whole (as that needs to allocate at least as much memory as the file written is in size),...but write_entry_stream (the API is a bit different),...happy to paste a sample.

Wamy-Dev commented 7 months ago

That would be awesome! Thank you. Yeah, I deal with some large files so streaming is the best option.

inzanez commented 7 months ago

Something like this should do,...somtething like use tokio_util::compat::FuturesAsyncWriteCompatExt; might be required,...

#[get("/zip")]
async fn zip_stream() -> ByteStream![Vec<u8>]   {
    let (sender, mut receiver) = duplex(65536);
    let base_path = std::path::Path::new("/some/dir/to/zip");

    let t = tokio::task::spawn( async move {
        let mut w = ZipFileWriter::with_tokio(sender).force_zip64();

        for e in WalkDir::new(&base_path) {
            let e = e.unwrap();

            if e.metadata().unwrap().is_file() {
                let p = e.path().strip_prefix(base_path.parent().unwrap()).unwrap().to_str().unwrap();
                let ze = ZipEntryBuilder::new(ZipString::from(p), Compression::Stored).build();
                let mut f = tokio::fs::File::open(e.path())
                            .await
                            .expect("Could not open file");
                 let mut ew = w.write_entry_stream(ze).await?;
                 let mut cc = ew.compat_write();
                 tokio::io::copy(&mut f, &mut cc).await?;
                 cc.into_inner().close().await?;
            }
        }

        w.close().await.expect("Failed to close writer");
    });

    let mut buf = vec![0; 65536];

    ByteStream! {
        loop {
            let read = receiver.read(&mut buf).await.expect("F");
            if read > 0 {
                yield buf[0..read].to_vec();
            } else {
                if t.is_finished() {
                break;
                }
            }
        }
    }
}
Wamy-Dev commented 7 months ago

Thank you so much! One last thing, I've been searching for it around here, but I cant seem to figure it out. Is there a way to calculate the final size of the zip so I can pass it through a header?

I tried adding up the file sizes and the headers but my math might be wrong, since it always fails at the end.

Edit: I am only using Stored compression. It should work fine this way, but doesn't.

inzanez commented 7 months ago

Not that I am ware of. I use a streaming receiver (javascript stream to file),...

Wamy-Dev commented 7 months ago

ah, thanks anyways

inzanez commented 7 months ago

There is still the header for every ZIP file (although not fully populated) and the central directory. So you cannot just add all the file sizes even if using 'Stored'. Then there might also be some padding,...I think it should be doable, but you'd need to test.

Wamy-Dev commented 7 months ago

using large files, the code you provided @inzanez still takes up an equal amount of memory :(. Is there a way around this? I can't seem to figure it out. For example, I am trying a 150gb zip of lots of medium sized files, around 3-4gb each. Perhaps it opens the entire file in memory?