uktrade / stream-zip

Python function to construct a ZIP archive on the fly
https://stream-zip.docs.trade.gov.uk/
MIT License
110 stars 9 forks source link

Any plans for async support? #87

Closed Arssham closed 1 year ago

Arssham commented 1 year ago

Hi. Thanks for very useful utility!

Do you have any plans to add asynchronous equivalent of stream_zip?

michalc commented 1 year ago

For now at least, not really.

You can use stream-zip as-is in async contexts as is without blocking by running it in a thread. The answer at https://github.com/uktrade/stream-zip/discussions/31#discussioncomment-5669355 does this I think in FastAPI. I can't speak to its performance however.

Are you asking for a specific context/use case?

Arssham commented 1 year ago

In my case i need function which accepts async iterables/iterators as input

Like here https://github.com/kbbdy/zipstream/blob/master/examples/async_zip_stream.py

michalc commented 1 year ago

You could try something like this? An async_stream_zip function that uses to_thread to communicate from async code to sync code in a separate thread, and then uses run_coroutine_threadsafe to run async code from that thread.

import asyncio
from stream_zip import stream_zip

# Async version of stream_zip that accepts an async iterable of member files,
# that itself has its data specified as an async iterable of bytes
async def async_stream_zip(async_member_files, *args, **kwargs):

    def sync_iterable(async_iterable):
        async_it = aiter(async_iterable)
        while True:
            try:
                yield asyncio.run_coroutine_threadsafe(anext(async_it), loop).result()
            except StopAsyncIteration:
                break

    def sync_member_files():
        for member_file in sync_iterable(async_member_files):
            yield member_file[0:4] + (sync_iterable(member_file[4],),)

    # to_thread doesn't work with StopIteration raised in it, so we convert to a sentinel value
    def to_thread_safe_next():
        try:
            return next(zipped_chunks_it)
        except StopIteration:
            return done

    loop = asyncio.get_event_loop()
    zipped_chunks_it = iter(stream_zip(sync_member_files(), *args, **kwargs))
    done = object()

    while True:
        value = await asyncio.to_thread(to_thread_safe_next)
        if value is done:
            break
        yield value

used for example as:

from datetime import datetime
from stat import S_IFREG
from stream_zip import ZIP_32

# Async iterable of data
async def get_data():
    yield b'Some bytes 1'
    yield b'Some bytes 2'

# Async iterable of member files
async def get_async_member_files():
    yield (
        'my-file-1.txt',     
        datetime.now(),      
        S_IFREG | 0o600,
        ZIP_32,              
        get_data(),
    )
    yield (
        'my-file-2.txt',     
        datetime.now(),      
        S_IFREG | 0o600,
        ZIP_32,              
        get_data(),
    )

async def main():
    # Here we just print the zipped chunks, but in the real use case would
    # probably do something else with them, e.g. send across the network
    async for chunk in async_stream_zip(get_async_member_files()):
        print(chunk)

asyncio.run(main())

In my case i need function which accepts async iterables/iterators as input

But would be good to have even more detail of the context to make sure this solution is helpful. Where are the files coming from, where are they sent to for example?

The above code I think is only for Python 3.10 onwards. Note also as opposed to AioZipStream, this accepts an async iterable of member files, so the list of files themselves can come from an async source, not just their data.

Edit: edited the above a code a few times to simplify it

michalc commented 1 year ago

Maybe this should be in stream-zip... it might be a good combination of both useful and non-trivial enough...

Arssham commented 1 year ago

Thanks for code you provided. It took me a while to figure out how it works. I noticed only one moment in this line yield asyncio.run_coroutine_threadsafe(anext(async_it), loop).result(). If async_it gets stuck for some reason it will block event loop. It is disturbing.

Also i'm not sure if it is a good idea to synchronously wait for data coming from network when we are in asynchronous world. It would be nice if the loop skips this code until the data appeared. It seems impossible without changes/additions inside your library

michalc commented 1 year ago

I noticed only one moment in this line yield asyncio.run_coroutine_threadsafe(anext(async_it), loop).result(). If async_it gets stuck for some reason it will block event loop. It is disturbing.

Ah - this isn’t being called from the event loop, but from a separate thread. So no matter what it does, it should not block the event loop

michalc commented 1 year ago

Ah there is a short but I think informative post at https://stackoverflow.com/a/55077370/1319998 about the techniques used above

(The to_thread function uses run_in_executor under the hood I think)

michalc commented 1 year ago

Also I have now asked (and answered) on Stack Overflow about a very similar situation to this https://stackoverflow.com/questions/76991812/convert-generator-function-that-takes-a-sync-iterable-to-async-generator-functio/76991813#76991813

Arssham commented 1 year ago

Now i am see... Thanks for showing me this hack, however "If the implementation is hard to explain, it's a bad idea". I will definitely note this, but for now i'll use AioZipStream with clear async interface even though it doesn't seem to be maintained. And if I'll get into something, i'll come back for for your hack.

Please close this issue if you wish. And thanks once again

michalc commented 1 year ago

Will keep it open for a bit I think to ponder adding an async interface

michalc commented 1 year ago

Ah also, I was having a nose around AioZipStream, and I think it uses a similar pattern - passing sync code through loop.run_in_executor to run it in a separate thread https://github.com/kbbdy/zipstream/blob/e09a6ce2b16f24ace3aad7aa8f9e6c08f85055d3/zipstream/aiozipstream.py#L38. So I would argue not that different to the async_stream_zip code above.

michalc commented 1 year ago

Ah and it also looks like AioZipStream doesn't seem to support the list of files themselves coming from an async iterable. Although it does depend on the specific use case as to if this is important or not.

michalc commented 1 year ago

I think I've come to the conclusion that for now at least, no async interface will be built into stream-zip. But have documented how you could do it at https://stream-zip.docs.trade.gov.uk/async-interface/

Maybe down the line this will be re-visited, but suspect it'll be far enough in the future that it won't help people with their current projects, so I'm closing this issue.