enthought / traits

Observable typed attributes for Python classes
Other
421 stars 85 forks source link

Async observer dispatch #1770

Closed corranwebster closed 3 months ago

corranwebster commented 5 months ago

It would be nice to support dispatch via asyncio or other similar async systems using traits.

It's not hard to create a dispatcher that dispatches regular functions asynchronously:

def dispatch_asyncio(handler, event):
    try:
        loop = asyncio.get_event_loop()
        loop.call_soon_threadsafe(handler, event)
    except RuntimeError:
        # no loop currently; dispatch synchronously
        handler(event)

which could then be hooked up via _ObserverDispatchers["async"] = dispatch_asyncio and then invoked with dispatch="async"; but this is only half the problem, since the callback is a regular function it will block the event loop while it executes. This is of course analogous to what happens with dispatch="ui".

But it would be really nice to be able to have async observers to handle longer running tasks in the background (I/O in particular, but also possibly GUI tasks). Something like:

class RemoteResource(HasTraits):
    url = Str()
    data = Bytes()

    @observe('url')
    async def _download_data(self, event):
        # probably need more sanity checking, but this is the general idea
        self.data = await get_url_async(event.new)

This raises some design issues, though:

Something like the following might work:

_handler_tasks = set()

async def gather_handlers():
    # probably a better way of doing this
    while _handler_tasks:
        await asyncio.sleep(0)

def dispatch_async_handler(handler, event):
    task = asyncio.create_task(handler(event))
    _handler_tasks.add(task)
    task.add_done_callback(_hander_tasks.discard)

You could imagine this being used something like the following:

async def get_resources(urls):
    resources = [RemoteResource(url=url) for url in urls]
    await gather_handlers()
    return resources

although I also think that if async-based handlers are used it will be much more natural to chain observers: if you need to react when the download is done, just observe data and assume that the app/server code is doing a loop.run_forever() somewhere.

@mdickinson Thoughts?

mdickinson commented 5 months ago
  • what happens if there is no async event loop running? (nothing? error? start an event loop and call run?)

On another project (using Panel rather than Traits), I had a need for a fourth option: schedule the task to be executed when the event loop starts. What Panel did instead was to start an event loop and call run, then block until the task completed, but that was undesirable for the situation I was in, since we wanted a task to start at application startup and run in parallel with the application.

So I'd vote for either (a) error, or (d) schedule the task to be executed when the event loop starts running.

  • there needs to be some way of awaiting the completion of the async handlers, I think

I'm not sure we need to do anything special here. A fire-and-forget model works pretty well with async, and it's the basis of how things work in JavaScript. Individual async handlers can take care to update state to mark completion if that's what they need. Avoiding interactions from incomplete cleanup in unit tests is generally easy to achieve by using IsolatedAsyncioTestCase from the std. lib.

So I can't think of any use-cases that would require having Traits do bookkeeping of running coroutines beyond the book-keeping already being performed by asyncio. Project-specific bookkeeping can be handled in the coroutines themselves.

corranwebster commented 5 months ago
  • what happens if there is no async event loop running? (nothing? error? start an event loop and call run?)

... So I'd vote for either (a) error, or (d) schedule the task to be executed when the event loop starts running.

Having things error is certainly the easiest, and probably a reasonable first step. It's easier to make a change that does something sensible instead of an error condition than to do something wrong and then have to change all the code that depends on it.

  • there needs to be some way of awaiting the completion of the async handlers, I think

I'm not sure we need to do anything special here. A fire-and-forget model works pretty well with async, and it's the basis of how things work in JavaScript. Individual async handlers can take care to update state to mark completion if that's what they need. Avoiding interactions from incomplete cleanup in unit tests is generally easy to achieve by using IsolatedAsyncioTestCase from the std. lib.

So I can't think of any use-cases that would require having Traits do bookkeeping of running coroutines beyond the book-keeping already being performed by asyncio. Project-specific bookkeeping can be handled in the coroutines themselves.

FWIW, I just implemented what I think may be the dumbest possible approach in an async Flet app and it just worked. I just replaced dispatch_same with:

_handler_tasks = set()

def dispatch_same(handler, event):
    if asyncio.iscoroutinefunction(handler):
        task = asyncio.create_task(handler(event))
        _handler_tasks.add(task)
        task.add_done_callback(_handler_tasks.discard)
    else:
        handler(event)

I think we do need the book-keeping of the _handler_tasks set to prevent the tasks being garbage collected, but in practice (and in particular, where the observers are being manually connected in a context where we know that the async event loop exists) it seems that we don't need to worry about gathering the tasks.

mdickinson commented 4 months ago

I think we do need the book-keeping of the _handler_tasks set to prevent the tasks being garbage collected

Ah yes, that makes sense.

The other half of async support that I think I'd want at some point would be the ability to await a Trait change in a coroutine.

corranwebster commented 4 months ago

The other half of async support that I think I'd want at some point would be the ability to await a Trait change in a coroutine.

Yes, that would be nice, something like:

event = await obj.await_trait_change("foo.bar")

where event is the appropriate trait change event. Not quite sure what the implementation would look like - possibly something which uses an asyncio.Event internally that gets set by an observer on the trait. Something which looks a bit like:

async def await_trait_change(obj, trait):
    async_event = asyncio.Event()
    trait_event = None

    def handler(event):
        nonlocal trait_event
        trait_event = event
        async_event.set()

    obj.observe(handler, trait)
    await async_event.wait()
    obj.observe(handler, trait, remove=True)
    return trait_event

It would be nicer if asyncio.Event could carry a payload, but that doesn't seem to be the case.

I could also see a pattern (particularly for Event traits) where the trait events are instead yielded so you could do something like:

async for event in obj.yield_trait_changes("foo.bar"):
    # process the event
    ...

Being able to do this seems independent of asyncio handlers, and so could be implemented separately.

mdickinson commented 4 months ago

Being able to do this seems independent of asyncio handlers, and so could be implemented separately.

Yes, definitely.

mdickinson commented 3 months ago

Basic feature completed in #1771. We can open separate issues for follow-on features, if desired.