groove-x / trio-util

Utility library for the Python Trio async/await framework
https://trio-util.readthedocs.io/
MIT License
68 stars 6 forks source link

AsyncDict vs. cancellation #4

Closed smurfix closed 4 years ago

smurfix commented 4 years ago

AsyncDict doesn't adequately cover the "send a request, wait for a reply" pattern: if the waiter is cancelled before the reply arrives, that reply will languish in the dict indefinitely. Storing the reply can't depend on is_waiting either, because it could arrive before the async request part completes.

Fixing this probably requires an async context manager.

takluyver commented 4 years ago

I thought of an abstraction that could have avoided at least one issue I stumbled into:

# Task 1 - making the request
async with foo.expecting(request_id) as find_reply:
    conn.send(request_id, data)
    return await find_reply()

# Task 2 - processing incoming replies
foo.fulfil(request_id, reply_data)

i.e. the .expecting() context manager would make a slot of some kind for the expected reply, and clean that slot up when it exits, whether it was fulfilled or not.

smurfix commented 4 years ago

Exactly what I was thinking, though you probably don't need an async context manager for that.

belm0 commented 4 years ago

this is supposed to be a mere dictionary, with no policy about keys

if we integrate this context manager, it would need a policy about not reusing keys-- which is strange for a dictionary

I wonder if this context manager can be built on top of AsyncDictionary.

belm0 commented 4 years ago

if the waiter is cancelled before the reply arrives, that reply will languish in the dict indefinitely

i.e. the .expecting() context manager would make a slot of some kind for the expected reply, and clean that slot up when it exits, whether it was fulfilled or not

I think there is also the case where the reply never comes, e.g. if the request was lost in transit. The worst case is where a placeholder was created, the request was issued over the network, the caller was cancelled, and then the request was dropped (perhaps due to unavailability of the server). So in a simple implementation that can still result in metadata existing indefinitely.

The context manager is probably best integrated into the request client, which knows details of the request lifetime and reply state.

takluyver commented 4 years ago

If the task making the request is cancelled, then leaving the .expecting() context manager should clean up its 'reply slot'. If there's no timeout or cancellation, then it won't clean up, but that's not really about this API, it's a general hazard of waiting for something that may never happen.

The general bit I missed when trying to do this was the 'reply slot', aka what other frameworks call a future. With that, it's easy to make on top of a plain dict.

belm0 commented 4 years ago

If there's no timeout or cancellation, then it won't clean up, but that's not really about this API, it's a general hazard of waiting for something that may never happen.

But even when the requesting task is cancelled, the dict needs to remember that there is say an outstanding request already on the network, and to ignore the reply. I'm suggesting it's better managed by the code that is writing to the AsyncDictionary, rather than the AsyncDictionary itself.

smurfix commented 4 years ago

the dict needs to remember that there is say an outstanding request already on the network, and to ignore the reply

Why should it? The client shall be required to use the context manager, which remembers all non-cancelled requests. Thus, any reply that arrives with an unknown key can safely be ignored.

In any case, the pattern I commonly use for async clients looks like this:

requests = dict()
…
async def send_request(data):
    key = gen_key()
    requests[key] = evt = trio.Event()
    try:
        await send("req",key,data)
        await evt.wait()
        return requests[key]
    finally:
        del requests[key]
…
async def receive_loop():
    while True:
        key,reply = await recv_message()
        try:
            evt = requests[key]
        except KeyError:
            pass
        else:
            requests[key] = reply
            evt.set()

(Bonus points for sending cancellations to the server when a single request is cancelled, packing results in an Outcome so that you can raise a client error when the request errored out, sending errors if the reader dies, and whatnot.)

This code doesn't need an AsyncDictionary, and in fact cannot benefit from one that doesn't have a context manager.

belm0 commented 4 years ago

I've come around to understanding the value of it, thank you 👍

I wonder if AsyncDictionary can cover it by only adding this simple expecting() context manager:

replies = AsyncDictionary()

# send request
with replies.expecting(key):
    await ...  # some request causing `key` to eventually appear
    result = await replies.pop_wait(key)

# receive loop
async for key, result in channel:
    if replies.is_waiting(key):
        replies[key] = result
takluyver commented 4 years ago

So is_waiting(k) would return True if a task is waiting in get_wait(k) or pop_wait(k), or if a task is currently inside the with x.expecting(k) block?

I think that works, though it would be easy to forget the .expecting() block - 99% of the time the sending task will get to pop_wait() before the receiving task checks is_waiting(), so you could accidentally rely on that. The workaround I've written for Jeepney forces you to use the .expecting() method as part of retrieving the key.

takluyver commented 4 years ago

Would it make sense, rather than adding to AsyncDictionary, to define a more restrictive 'expecting' async dictionary (ExpectationManager :stuck_out_tongue: ), which would guarantee:

belm0 commented 4 years ago

I think that works, though it would be easy to forget the .expecting() block

The same is true for the receive loop checking replies.is_waiting(key). But I assume this is fairly low-level code and a knowledgeable person is writing both the send request and receive loop. In my use case of a multiplexing client, each of these appears only once.

It's attractive because it fits within the existing AsyncDictionary API, and the use is optional. (I suspect there are valid uses of AsyncDictionary without expecting() or is_waiting() but can't yet prove it.)

belm0 commented 4 years ago
replies = AsyncDictionary()

# send request
with replies.expecting(key):
    await ...  # some request causing `key` to eventually appear
    result = await replies.pop_wait(key)

# receive loop
async for key, result in channel:
    if replies.is_waiting(key):
        replies[key] = result

There is still a race here if the send task is cancelled just as the replies entry is inserted.

~So I'd have to also change the semantics of pop_wait() to always delete the key on exit.~

The request code would need to delete the entry in a manual finally, or expecting() itself would need to be responsible for deleting the key.

--> rename expecting() to expect_and_finalize()

smurfix commented 4 years ago

I wouldn't rename that. To me the semantics of expecting state clearly enough that when the block exits there is no longer anything expecting a reply.

Personally I'd structure the API differently:

async with replies.expecting(key) as result:
    await generate_the_request(key)
process_reply(result.value)

i.e. the async context returned by expecting would have an __aexit__ that's responsible for waiting for the reply and for storing the result in result.value. If the receive loop gets an error / is cancelled, accessing the value should raise the corresponding exception.

I wouldn't call this class a dictionary because you'd never use replies[key]. Getting is going to be mediated by the async context, or pop_wait(). Setting the actual value must also be async because AnyIO doesn't have a sync way to trigger an event.

belm0 commented 4 years ago

I think I'm ready to give up on AsyncDictionary:

  1. though I suspect synchronous set/get and blocking get/pop might have some use cases-- particularly, the blocking get can be used for broadcast-- I don't have one myself. And a principle of trio-util is for things to actually be used (particularly by me, so I can understand the API space).
  2. though I prototyped support for the context manager on top of the existing API, the result is less than simple in terms of API and implementation. And I don't want to carry complexity due to the part of the API that doesn't have a proven use case (see 1).

So the next question is, is a multiplexed request/reply utility a good value for trio-util? A few points causing me to not be too thrilled about it at the moment:

smurfix commented 4 years ago

IMHO the pattern is simple enough, but as you noted the devil is in the detail and implementation requirements differ wildly. For instance, if that request was to an SQL server the first thing I'd do is to implement sending a kill command to the server when somebody cancels a request on the client.

My simple ValueEvent has the same problem: the code responsible for generating the value might want to be cancelled when the client decides that it no longer wants the result. I tried to solve that by adding an optional scope argument. While that enables a possible workaround it's still not a good fit to Structured Concurrency patterns and doesn't even try to propagate cancellations both ways.

belm0 commented 4 years ago

AsyncDictionary has been removed from the package

milahu commented 9 months ago

for reference

class AsyncDictionary was removed in https://github.com/groove-x/trio-util/commit/f884cf8376feaf54701eabc0c758accbd53f48f3

last version of class AsyncDictionary

use case

handle dependencies between events that arrive in random order

when a dependency is missing, the event handler waits until another event handler adds the dependency or fails on timeout

see also

python asyncio asynchronously fetch data by key from a dict when the key becomes available

async_dict