uktrade / stream-unzip

Python function to stream unzip all the files in a ZIP archive on the fly
https://stream-unzip.docs.trade.gov.uk/
MIT License
277 stars 12 forks source link

Feature/make it async #80

Closed marcell-szabo closed 7 months ago

marcell-szabo commented 8 months ago

Hello, Recently I had to transform a codebase, that had a dependency on this library, to use asyncio. Therefore, I transformed the stream_unzip.py to accept an asynchronous generator as data source, and run the entire unzipping process asynchronously. I would like to contribute the modification back to this original library, so here is my modified code.

Note, that there has not been any modification that concerns the core unzipping/decrypting etc. functionality, only the generators and data passing has become asynchronous. To prove this I have rewritten the tests in test.py to check the new async behavior, the result can be seen here, all of them are passing.

Screenshot 2024-02-23 125027

michalc commented 8 months ago

Wow! Thanks for this

I am unsure about the duplication I have to admit. Instead, should there be a layer that allows calling all the existing sync code in stream_zip from the async world? Something like the following: (only briefly tested)

import asyncio
from stream_unzip import stream_unzip

async def async_stream_unzip(chunks, *args, **kwargs):

    async def to_async_iterable(sync_iterable):
        # to_thread errors if StopIteration raised in it. So we use a sentinel to detect the end
        done = object()
        it = iter(sync_iterable)
        while (value := await asyncio.to_thread(next, it, done)) is not done:
            yield value

    def to_sync_iterable(async_iterable):
        done = object()
        async_it = aiter(async_iterable)
        while (value := asyncio.run_coroutine_threadsafe(anext(async_it, done), loop).result()) is not done:
            yield value

    loop = asyncio.get_running_loop()
    unzipped_chunks = stream_unzip(to_sync_iterable(chunks), *args, **kwargs)

    async for name, size, chunks in to_async_iterable(unzipped_chunks):
        yield name, size, to_async_iterable(chunks)

Also, I would probably lean to this being in the same module, stream_zip.py, so from the point of view of client code, the imports are always from stream_zip

And finally, running the tests it looks like they don't pass on Python 3.6 and 3.7: looks like since they use IsolatedAsyncioTestCase which I think was only introduced in Python 3.8. I think I'm a fan of continuing to support older versions of Python unless we really can't for some reason.

Edit: simplified the above code a bit

michalc commented 8 months ago

Ah just realised that to_thread I think is Python 3.9 onwards. But... looks like its code is simple https://stackoverflow.com/a/65319037/1319998 and we can have a version in stream-unzip?

michalc commented 8 months ago

Turning into a stream of consciousness a bit, but I think I am more and more a fan of not having the duplication, and instead having a thread-based layer in front of the existing sync code to avoid blocking the event loop. Unless the performance implications of such a layer are really tremendous, then I suspect that for the vast majority of cases this will be good enough.

The main reason is maintainability I think: already this is a lot of quite specific knowledge required to make changes in this library. If this were to go in, to make almost any change a maintainer would need all of:

  1. Good knowledge of the ZIP format
  2. Good knowledge of sync iterables/generators
  3. Good knowledge of async iterables/generators
  4. To keep two almost identical versions of code in-sync, which is usually seen as painful and error-prone.

Of the above, 1. and 2. I suspect are inescapable really, but with a thread-based layer 3 and 4 are avoided for many classes of changes. And on 3: as much as I do like async in general, I can't escape the fact that, at least anecdotally, many Python developers are not able to reason about it as well as sync code. (Maybe I am being condescending here... not sure. But I wouldn't want my fear of being condescending to make me unrealistic)

marcell-szabo commented 8 months ago

Hello there, yes I think you are right, normally duplicated code is not ideal, I just did it because originally I looked into your code and I identified quite quickly what needs changing for it to be async so I changed it. I have not really tried to optimize, just quickly rewrote the parts that enable async iterations.

All in all, your proposed solution is much better, I agree with all the points that you have mentioned, and very much welcome the interest in incorporating async functionality.

michalc commented 8 months ago

Recently I had to transform a codebase

I'm curious - can I ask what this transformation was in more detail? And why it was decided to be done? Essentially this is to have a bit of knowledge of how stream-unzip is used in order to inform future changes.

marcell-szabo commented 8 months ago

Sure, we have a task where we run anomaly detection on a lot of files, and for that we have to fetch them. Some files were in zip archives therefore these needed to be unzipped (your lib was/is being used to do this) . Recently, as there was a lot of IO involved, and since Python is knowingly bad at multithreading but for IO bound tasks async execution is fitting, we have decided to switch to asyncio for both log fetching and local file management.

Also we implemented a “semi-multiservice” architecture where services can also be horizontally scaled, but at the heart, this async transformation was. By using this arch. we achieved a non-indispensable speed up compared to full sync processing.

michalc commented 8 months ago

Ah good to know - thanks!

michalc commented 8 months ago

Inspired a bit by this PR and discussion, have just added async support (via threads) to stream-zip: https://github.com/uktrade/stream-zip/pull/114

michalc commented 7 months ago

@marcell-szabo To check, would you be ok with converting this PR to use a threads-based layer like in this message, and like the threads-based layer that's now in stream-zip?

No is of course all good, but just if it is no, I/we might do it ourselves

michalc commented 7 months ago

As discussed, a threads-powered async interface has now been released in v0.0.91