uktrade / stream-zip

Python function to construct a ZIP archive on the fly
https://stream-zip.docs.trade.gov.uk/
MIT License
111 stars 9 forks source link

Asyncio support without threading #135

Open sashkent3 opened 2 months ago

sashkent3 commented 2 months ago

Under the hood async_stream_zip uses threads as a layer over the synchronous stream_zip function. Would you accept a more flexible 'pure-asyncio' implementation? This would still allow running async_stream_zip via threads, however in my limited testing simply using asyncio.run provides a 2-5% performance benefit over the current async_stream_zip implementation.

I have thrown together a quick and dirty implementation which is intentionally kept as close to the original as possible. I'm happy to contribute it as a PR for further review and refinement.

michalc commented 2 months ago

Would you accept a more flexible 'pure-asyncio' implementation?

Hmmm... I guess maybe. I'm torn about the duplication that would then have to be maintained indefinitely, kinda doubling the work for all changes in a way. I would love a pure asyncio implementation that doesn't have duplication with the sync implementation...

however in my limited testing simply using asyncio.run provides a 2-5% performance benefit over the current async_stream_zip

Can you give more details on this? Specifically since asyncio.run runs a coroutine, so to me it's not obvious how this can replace the coroutine async_stream_zip

sashkent3 commented 2 months ago

I would love a pure asyncio implementation that doesn't have duplication with the sync implementation...

I understand your concerns, however, avoiding the code duplication while maintaining proper implementations for both cases isn't possible to my knowledge. The best alternative I've considered is maintaining the async implementation and rewriting the sync implementation through it. For example:

import asyncio

async def async_stream_zip():
    # implementation goes here
    pass

def sync_stream_zip():
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    return loop.run_until_complete(async_stream_zip())

I'm not 100% certain though if such an implementation is fully backwards compatible (in the context of running sync stream_zip from an async environment). The most common practice is to maintain split implementations.

Can you give more details on this?

This is the crucial line in the current implementation. It essentially runs stream-zip in a thread pool. In contrast, asyncio.run uses its own scheduler based on non-blocking IO. This is often more efficient because it eliminates the need to spawn relatively heavy OS threads (see Green thread on Wikipedia). Thurthermore, asyncio.run would not be included in the implementation at all. We should simply provide the async definition and leave the method of running it up to the user (asyncio, uvloop, greenlet, eventlet, etc.).

michalc commented 1 month ago

The best alternative I've considered is maintaining the async implementation and rewriting the sync implementation through it. For example: [...] I'm not 100% certain though if such an implementation is fully backwards compatible (in the context of running sync stream_zip from an async environment)

My suspicion is that it indeed won't be fully backwards compatible in a number of ways, and to have an event loop running in what is otherwise regular Python sync code makes it seem somehow... over-engineered... And maybe even introduce a performance penalty of its own

Thurthermore, asyncio.run would not be included in the implementation at all.

Ah yes I see

The most common practice is to maintain split implementations.

So we would do this I suspect. However, for a 2-5% performance benefit... I think I'm a bit anti. I can't help but suspect the current asyncio performance is good enough for almost everyone (maybe even including... you?) and squeezing a few % at the price of an increase risk of incorrect behaviour or slower time to address issues in future (by myself or other future maintainers) because of maintaining almost-identical implementations, might not be worth it.

michalc commented 1 month ago

So there is another option that's crossing my mind... it is possible factor out all the logic somehow to some core code that is used by both the sync and async versions? Not sure what that would look like, but it should be possible?

michalc commented 1 month ago

It could maybe define a StreamZipper or something, working like this a bit. For sync:

def stream_zip(member_files):
    start_file, flush_zip = StreamZipper()

    for member_file in member_files:
        push_data, flush_member_file = start_file(member_file[0:4])

        for chunk in member_file[4]:
            yield push_data(chunk)

        yield flush_member_file()

    yield flush_zip()

and then for async:

async def async_stream_zip(member_files):
    start_file, flush_zip = StreamZipper()

    async for member_file in member_files:
        push_data, flush_member_file = start_file(member_file[0:4])

        async for chunk in member_file[4]:
            yield push_data(chunk)

        yield flush_member_file()

    yield flush_zip()

Might not have thought it all through, but that's the sort of idea...