awslabs / aws-crt-python

Python bindings for the AWS Common Runtime
Apache License 2.0
87 stars 43 forks source link

Use `anyio` to support an idiomatic `async` interface #558

Open Zac-HD opened 5 months ago

Zac-HD commented 5 months ago

Describe the feature

Reading that "All network operations in awscrt.http are asynchronous." caused me a brief moment of confusion, because the modern idiom for asynchronous programming in Python uses async/await rather than raw futures.

More specifically, the community is moving towards "structured concurrency" - with the Trio framework, new features in the stdlib asyncio module, and the anyio library (which allows libraries to work with user's-choice of Trio or asyncio).

Use Case

We're increasingly using async Python with Trio at work for all our dev tooling, and so I'd love to see aws-crt-python to support syntatic asynchrony on top of the current return-a-future design. This would also be useful for libraries such as https://github.com/aio-libs/aiobotocore/discussions/1106.

Proposed Solution

Looking through the docs, I see three cases to be made awaitable: futures, events, and websockets.

The simple cases

Happily, it's trivial to support a very efficient wrapper for Futures:

import anyio
import concurrent.futures

class WaitableFuture(concurrent.futures.Future):
    async def wait(self):
        evt = anyio.Event()
        self.add_done_callback(lambda _: evt.set())
        await evt.wait()
        return self.result(timeout=0)

# Users do a simple syntax-level transformation, e.g.:
client = awscrt.http.HttpClientConnection.new().result()  # current sync api
client = await awscrt.http.HttpClientConnection.new().wait()  # proposed

While it's always possible to build this kind of wrapper downstream, it'd be nice to do it once upstream and have that work for everyone.

An awaitable version of threading.Event gives us the same pleasant user experience, at the cost of a gnarlier implementation: either we accept some pointless latency from a polling implementation, or the complexity of managing a 'waiter' thread:

# Again, users do a simple syntax-level transformation, e.g.:
loop = awscrt.io.EventLoopGroup()
loop.shutdown_event.wait()  # current sync api
await loop.shutdown_event.wait_async()  # either option below

# ---- implementation ----
import anyio
import threading
import weakref

class Event:
    def __init__(self, event: threading.Event, /) -> None:
        self.__event = event

    def wait_sync(self, timeout: float | None = None) -> bool:
        return self.__event.wait(timeout=timeout)

    # Polling from the main thread looks good from a code-complexity and system-resources
    # perspective, but incurs pointless delays of up to `interval` seconds.
    async def wait_async_polling(self, *, interval=0.1) -> bool:
        while not self.__event.is_set():
            await anyio.sleep(interval)
        return True

    # We can avoid that delay by using one 'waiter' thread per `await`ed threading.Event:
    _already_waiting_events: weakref.WeakKeyDictionary[threading.Event, anyio.Event] = (
        weakref.WeakKeyDictionary()
    )

    async def wait_async_worker_thread(self) -> bool:
        if self.__event in self._already_waiting_events:
            await self._already_waiting_events[self.__event].wait()
            return True

        def wait_for_event(threading_event, set_anyio_event):
            while not threading_event.wait(timeout=1.0):
                # If the `await` is cancelled, stop quickly even if the underlying event hasn't fired yet.
                anyio.from_thread.check_cancelled()
            anyio.from_thread.run_sync(set_anyio_event)
            return True

        self._already_waiting_events[self.__event] = anyio.Event()
        return await anyio.to_thread.run_sync(
            wait_for_event,
            self.__event,
            self._already_waiting_events[self.__event].set
            cancellable=True,
            # We want to avoid a global threadpool limit and corresponding possibility
            # of deadlocks.  Since we're confident that there's at least one thread
            # doing useful work for each outstanding event and we ensure only one
            # waiter-thread for each threading.Event, we don't need a limit here.
            limiter=anyio.CapacityLimiter(1),
        )

If we could arrange for a callback when the event is set, that would allow for a more elegant implementation than simply wrapping the threading.Event, but I prefer not to require supporting changes for an initial version.

More complicated cases

Higher-level wrappers for HTTP streams and websockets would also be nice - but even if this is desired I'd suggest starting with the low-level async/await shims and encouraging some downstream experimentation before committing e.g. exposing an e.g. wsproto interface.

Acknowledgements