Open miracle2k opened 6 years ago
Yeah, this is tricky.
So one problem is that we aren't communicating the issues here well (partly because I've struggled to understand them myself!). My current understanding is that unless you're using @contextmanager
or @asynccontextmanager
, it is always wrong to put a yield
inside a nursery block or cancel scope block. Using aclosing
is enough to make regular resource-cleanup with
/async with
work reliably inside generators, but nurseries and cancel scopes are even more complicated: they assume that between when you enter the block, and when you exit the block, you only run code that's inside the block. Where "inside" means, if an exception is raised, then the block will have a chance to catch it. In particular, consider what happens if you have a background task, and it crashes at a moment when the generator is suspended – trio will try to cancel all the code inside the nursery block, but it will get confused and end up cancelling whatever code is currently running in that task instead. So... I'm afraid your example code above is not really fixable.
We might be able to handle this better in 3.8, maybe with some version of PEP 568; in fact while writing this I had an idea which I've added to #495 :-). But right now there's not a lot we can do except getting better at educating users.
We should definitely have a clear description of this issue and its consequences in the docs.
Also, you're right about the error message here: we should be able to detect this case (at least some of the time), and give a better error message. I think we can diagnose it reliably by detecting cases where we try to leave a cancel scope and it's not at the top of the cancel stack, or when a task exits with a cancel stack that's bigger than the one it started with.
My current understanding is that unless you're using @contextmanager or @asynccontextmanager, it is always wrong to put a yield inside a nursery block or cancel scope block. Using aclosing is enough to make regular resource-cleanup with/async with work reliably inside generators, but nurseries and cancel scopes are even more complicated:
So since I am once again struggling with a generator-related issue (I'm now tending towards "avoid them"), let me ask for clarification. Until now I read this as meaning that as long as I am using aclosing
, I will be fine. If I replace my example from the initial post with:
import trio
async def gen_with_task():
async with trio.open_nursery() as n:
n.start_soon(trio.sleep_forever)
while True:
await trio.sleep(1)
yield 1
async def main():
iterable = gen_with_task() # THIS IS NEW
async with aclosing(iterable): # THIS IS NEW
async for f in iterable:
raise ValueError()
trio.run(main)
Then things are appear to be going well. But you write:
Using aclosing is enough to make regular resource-cleanup with/async with work reliably inside generators, but nurseries and cancel scopes are even more complicated
Which would imply that using aclosing()
is not enough?
My current understanding is that unless you're using @contextmanager or @asynccontextmanager, it is always wrong to put a yield inside a nursery block or cancel scope block.
Does that mean I should not be doing what I am doing above, even with aclosing
? A generator function should never start a nursery, and even though this particular example seems to work now, there might be other side-effects?
Or is there some way to make it work using @contextmanager/@asynccontextmanager?
So the case I find my self struggling with is this:
import trio
from async_generator import aclosing
async def fail_soon():
raise ValueError()
async def subscribe():
async with trio.open_nursery() as nursery:
nursery.start_soon(fail_soon)
while True:
yield 1
async def main():
events = subscribe()
async with aclosing(events):
async for event in events:
# XXX
await trio.sleep(1)
trio.run(main)
This causes a trio.MultiError: ValueError(), GeneratorExit()
.
My problem is the GeneratorExit
that happens here. In my production case, I am actually seeing a MultiError with only GeneratorExist exceptions, but I haven't been able to reproduce that.
As far as I can tell, this is what happens:
main
is waiting on the XXX
mark, awaiting the sleep to return.main
doing nothing, trio looks for other work. How about starting the fail_soon
task?fail_soon
raises a ValueError
, which causes the nursery that was created within the generator to be marked as cancelled.XXX
belongs to the scope of the generator nursery, I think, so now the await, being a checkpoint, fails and raises `Cancelled .aclosing
's __aexit__
. I verified that it receives a a Cancelled
exception. It calls events.aclose()
.Somehow this leads to a GeneratorExit
. I'm quite curious how that that exception comes to be, and why it is not hidden from me in this case.
aclosing()
hits the subscribe()
generator with a GeneratorExit
exception, and will ignore that Exception when it gets it back. However, your subscribe
doesn't (re-)raise that GeneratorExit
– it raises MultiError
with a GeneratorExit
inside, but Python doesn't introspect that (obviously).
Fix: ignore that error:
async def subscribe():
async with trio.open_nursery() as nursery:
nursery.start_soon(fail_soon)
try:
while True:
yield 1
except GeneratorExit:
return
Writing a context manager which does the same thing is left as an exercise to the reader. ;-)
Does that mean I should not be doing what I am doing above, even with
aclosing
? A generator function should never start a nursery, and even though this particular example seems to work now, there might be other side-effects?
Correct :-(
(To be totally precise: a generator function can use a nursery – e.g. it's OK to call trio.open_tcp_stream
, which uses a nursery internally for the happy eyeballs stuff – but it shouldn't yield
inside the nursery block.)
@smurfix is right about what's happening with the GeneratorExit
error, but I wouldn't recommend using that workaround... it doesn't address the underlying issue. In particular, consider what happens when a background task inside the nursery crashes, at a moment when you're blocked inside the body of the for
loop. Trio will try to deliver a cancellation to the blocked function – even though it's not inside the generator! – because Trio has no way to know that you've yielded out of the nursery; it just knows that you've entered the nursery, and haven't exited it yet. This Cancelled exception is supposed to be caught by the nursery block... but the nursery block is inside the generator, and the Cancelled
exception is outside, so the nursery block can't catch it. Instead it propagates all the way out and crashes your program.
There is good news though: you can make this API work basically the same as before, just, instead of using aclosing
as a context manager, you'll want to define a custom context manager that handles the nursery. That's totally fine. SO something like:
@asynccontextmanager
async def subscribe():
events = await set_up_subscription()
async with trio.open_nursery() as nursery:
nursery.start_soon(background_task)
async def events_gen_function():
while True:
yield get_something_from_background_task_or_whatever()
async with aclosing(events_gen_function()) as events:
yield events
# Usage:
async with subscribe() as events:
async for event in events:
...
So the pattern is: use an @asynccontextmanager
to set up the nursery and start any background tasks, then from it, yield an async iterable. That async iterable can now trust that the background task is running.
I guess we could have a @async_generator_with_nursery
decorator that does some hacky thing where it makes it so (a) you have to use async with function() as async_iterable: ...
instead of calling it directly, and (b) if the code inside the async with
block crashes, the exception gets thrown into the suspended generator. (And if it doesn't crash, then GeneratorExit
gets thrown in.) So it'd be like a super-weird hybrid of a context manager generator and a regular iterable generator.
This idea violates all kinds of conventional rules and taste, but maybe it'd be pragmatic and useful anyway? idk
Here’s one more example where one might want this (adapted from my recent code):
async def collect(*awaitables): # essentially the same thing as asyncio.as_completed()
results = Queue(0)
async def perform(a):
await results.push(await a)
async with open_nursery() as nursery:
for a in awaitables: nursery.start_soon(a)
while nursery.child_tasks: yield await results.get()
I also had a similar function, merge()
, for a list of async iterators (essentially aiostream.merge
). But the point here is not the functions, but the fact that these are actual, useful combinators. Not only is it impossible to write them nicely as above, but, if we keep the interface, it’s impossible to write them in trio at all. Of course, we could make the nursery an argument of collect()/merge()
, but this looks like a leaky abstraction to me (especially given that race()
, which is like collect()
but only returns the first result, does not need a nursery argument).
Side note: I understand the desire for minimal APIs, but trio needs more combinators like these. It has lots of useful stuff, but I want to be able to talk about it in my programs. (Similarly, str
does not strictly need str.split()
, but it would be infuriating to write it from scratch every time.)
For the purposes of this comment, a “thread” will mean anything with a separate (Python) instruction pointer (whether or not it has a stack, is associated with a trio task or an OS thread, etc.).
@njsmith says:
So one problem is that we aren't communicating the issues here well (partly because I've struggled to understand them myself!)
Semantically speaking, what cancel scopes and (to some extent) nurseries implement is dynamic scoping. For example, I see the cancel scopes section of @njsmith’s essay as essentially advocating passing the cancel token in a dynamically scoped variable (Common Lisp “special variable”, Clojure “var”, etc.) instead of an explicit argument.
What’s relevant about dynamic scope here is that when a thread is created, it should (generally) inherit it from its parent. AFAIU, this is indeed what trio does when a child task is created (at least wrt cancel scopes). However, Python provides one more way (!) to create a thread: create a generator (whether async or not). It seems that trio doesn’t (or is unable to) handle this at all, and so it all comes crashing down.
If I understand contexts, context vars and PEP 568 correctly (... :-/ ), a correct dynamic variable (suitable for e.g. a cancel token), given PEP 568, is simply something like the following ...
from contextlib import contextmanager
from contextvars import ContextVar
class Dynamic:
def __init__(self, name):
self._value = ContextVar(name)
self._stack = []
@property
def value(self):
return self._value.get()
@contextmanager
def __call__(self, value):
self._stack.append(self._value.set(value))
try:
yield
finally:
self._value.reset(self._stack.pop())
if __name__ == '__main__':
v = Dynamic('v')
with v('spam'):
print(v.value) # spam
with v('eggs and'):
print(v.value) # eggs and
print(v.value) # spam
... which really should be in the standard library.
Yeah, what cancel scopes need is something like dynamic scoping (or actually slightly more powerful – scopes can be nested, so we need to be able to fetch all the values on the stack, not just the latest value). That was a major motivation for writing PEP 568.
But, PEP 568 hasn't been accepted or implemented, so we can't actually use it currently. And the issues with nurseries are actually worse: if you can yield out of a nursery block, you can actually recreate the equivalent of a go
statement by mistake. Currently the best overview of how all these things relate is this comment: https://github.com/python-trio/trio/issues/264#issuecomment-418989328
At the python sprint last week, Yury (main asyncio maintainer) and i talked about this quite a bit, since asyncio has the same problem now that it's adding nurseries. The best we were able to come up with was to add a way for a context manager to forbid yields inside it. Though neither of us are overjoyed about it.
Regarding your collect
: yes, this means that you can't implement that operation with that API. Notice that that API is unsafe though, in the "structured concurrency" sense: if you stop iterating half-way through and abandon the generator, then there will be orphaned tasks left dangling. What we can support though is an API like:
async with collect(...) as results:
async for result in results:
...
Here the async with
makes it safe for collect
to create an internal nursery.
trio needs more combinators like these
Agreed. I want to get #586 finished first (so the results
object in the example above can be a Channel
), but then after that I think this is a high priority. Though tentatively my plan is to make it a separate library at least to start, and with some more substantial features, like built-in capacity management ("don't run more than 10 tasks at once", etc.).
By the way, it’s unclear to me how either PEP 568 or forbidding yield
inside generators could accomodate creating nurseries inside context managers using contextlib.asynccontextmanager
, like that mentioned in #569.
@alexshpilkin yeah, that's one of the major sources of complexity here. The idea is that we would keep a stack of Context
s (in PEP 568) or "is yield allowed" flags, and by default push/pop them when entering/exiting a generator, and yield
would check this stack before yielding. BUT, we'd also have a mechanism to disable this push/pop/check mechanism, by setting some flag on the generator object, and the contextmanager
decorator function would set this flag. So the intuition is that contextmanager
would have some special case code to tell the interpreter that it's generator isn't really a separate context, and yield
inside its generator isn't really a yield
.
(There is a bit of discussion of this in PEP 568, if you search for contextmanager
.)
Agreed. I want to get #586 finished first (so the results object in the example above can be a Channel), but then after that I think this is a high priority. Though tentatively my plan is to make it a separate library at least to start, and with some more substantial features, like built-in capacity management ("don't run more than 10 tasks at once", etc.).
See https://github.com/python-trio/trimeter
That's not really a drop-in replacement for async generators, though, it's a higher-level thing.
Here's a sketch for a possible alternative to async generators, that tries to keep a lot of the ergonomics that makes them attractive in the first place, while allowing background tasks and such: https://gist.github.com/njsmith/4db568255a276d4c7cf8a9a6b4295348
@miracle2k's original example using this:
# Add decorator, and send_channel argument
@producer
async def gen_with_task(*, send_channel):
async with trio.open_nursery() as n:
n.start_soon(trio.sleep_forever)
while True:
await trio.sleep(1)
# Replace 'yield X' with 'await send_channel.send(X)'
await send_channel.send(1)
async def main():
# Now have to use 'async with' to get an async iterator (actually a full-fledge ReceiveChannel)
async with gen_with_task() as aiter:
async for f in aiter:
raise ValueError()
I really like the ergonomics-per-implementation-complexity-unit of @producer
. I think the version as listed won't quite work -- the async with send_channel:
needs to be done inside the nursery task, else the channel won't be closed (and the iterating async for
thus won't terminate) until the context is exited, which is too late.
I also played around with supporting real async generators, and I think it's possible to do so robustly:
def producer(wrapped):
@asynccontextmanager
@functools.wraps(wrapped)
async def wrapper(*args, **kwargs):
send_channel, receive_channel = trio.open_memory_channel(0)
async with trio.open_nursery() as nursery:
async def adapter():
async with send_channel, aclosing(wrapped(*args, **kwargs)) as agen:
while True:
try:
# Advance underlying async generator to next yield
value = await agen.__anext__()
except StopAsyncIteration:
break
while True:
try:
# Forward the yielded value into the send channel
await send_channel.send(value)
break
except BaseException:
# If send_channel.send() raised (e.g. Cancelled),
# throw the raised exception back into the generator,
# and get the next yielded value to forward.
value = await agen.athrow(*sys.exc_info())
nursery.start_soon(adapter, name=wrapped)
async with receive_channel:
yield receive_channel
return wrapper
This is still technically suspending an async generator while it has nurseries/cancel scopes open, but the task that's iterating the async generator doesn't exit or enter any nurseries or cancel scopes while the async generator is suspended, and the exception forwarding effectively puts send_channel.send()
into the same context as the async generator's yield
. It seems to work well in practice, and supports things like putting a timeout around a yield
.
@producer
async def squares_in_range(low, high):
try:
for i in range(low, high):
with trio.move_on_after(0.5):
yield i ** 2
finally:
print("unwinding")
async def do_it():
async with squares_in_range(0, 100) as sqiter:
async for square in sqiter:
print(square)
await trio.sleep(0.01 * square)
if square >= 400:
raise RuntimeError("kaboom")
Oo, neat idea.
I have a tentative intuition that we do want something like the .raw
invocation, in particular to let you use non-default channel types. That can obviously be added to the async generator version though.
One thing I'm not sure of: suppose send
raises an exception (like BrokenResourceError
). With the explicit stream-passing version, it's obvious what happens: the code calls a stream method, that method can raise, it can handle it, etc. With the async generator version it's a little less obvious what should happen. In the code above, I think an exception from send
causes the yield
to raise GeneratorExit
, and then after the GenrratorExit
finishes unwinding the agen, it gets caught and the original exception keeps going. The alternative would be to take the exception that send
raised, and throw it into the agen, so the yield
acts exactly like a call to send
.
On the one hand, it's kind of surprising to have yield
raise BrokenResourceError
. (Especially for people who aren't familiar with all the intricacies of generators, which i think is most people.) But it's also nice to have the traceback show the agen frame being cleaned up, and to give the agen a chance to respond to the exception.
Another thing I'm not sure of: the version that passes in a channel can call any method on the channel, including e.g. send_nowait
or statistics
or whatever. With the async generator version, yield
is the equivalent of send
, and there's no way to access the stream object itself. Does this matter?
The alternative would be to take the exception that send raised, and throw it into the agen, so the yield acts exactly like a call to send.
I believe the code above does in fact do that -- is there something I'm missing? The rethrowing is necessary to ensure that if a cancel scope inside the generator causes the send to be cancelled, the Cancelled exception passes through that cancel scope as it unwinds.
For what it's worth, I think the ergonomics of the generator solution might be further improved by suppressing BrokenResourceError
explicitly on the call to send() in adapter
, since generally when someone exits the generator context it can be assumed that they want the generator to be unwound. Otherwise you will pretty much always get BrokenResourceError
raised out of the caller's async with
block if the async for
block is exited before the async generator is exhausted. This is probably open to debate though.
I agree that the version that passes send_channel
explicitly is more flexible and arguably clearer. ("Explicit is better than implicit", etc.) On the other hand, the version that adapts an async generator is an easier fit for people who go "why does my async generator [that uses cancel scopes/nurseries] not work?", and will provide somewhat clearer support for yield from
in async generators if CPython ever gets that. It's also only really possible to suppress BrokenResourceError in the async generator version, and I think the inability to suppress it will be bug-prone in the explicit channel version.
It should be feasible to write adapters that go in either direction, so I don't think it matters too much which one we decide should be "primitive".
We would probably also want to cancel the nursery inside wrapper()
when the context gets exited. Closing the receive channel will deal with cleanup when wrapper()
is suspended inside send_channel.send()
, but won't help if it's suspended somewhere else (i.e. in some await
in the async generator). We could optionally decide that this cancellation is always the way we clean up, and not close the receive channel until everything gets torn down. That would allow more parity between the asyncgen and explicit-channel implementations, and remove the BrokenResourceError trap.
I believe the code above does in fact do that -- is there something I'm missing?
Oh whoops, no, I just read too quickly on my phone :-).
The rethrowing is necessary to ensure that if a cancel scope inside the generator causes the send to be cancelled, the Cancelled exception passes through that cancel scope as it unwinds.
Ah yeah. That's a compelling argument against the other option, where we don't re-throw. Though it also makes me a little more dubious about the whole thing... especially since I'm trying to convince Yury that we should extend the interpreter to disallow yield
inside cancel scopes! (Reasoning is here.)
For what it's worth, I think the ergonomics of the generator solution might be further improved by suppressing BrokenResourceError explicitly on the call to send() in adapter, since generally when someone exits the generator context it can be assumed that they want the generator to be unwound. Otherwise you will pretty much always get BrokenResourceError raised out of the caller's async with block if the async for block is exited before the async generator is exhausted. This is probably open to debate though.
That's true, when people abandon a for
loop they don't generally expect the generator to get annoyed at them about that.
We could optionally decide that this cancellation is always the way we clean up, and not close the receive channel until everything gets torn down. That would allow more parity between the asyncgen and explicit-channel implementations, and remove the BrokenResourceError trap.
Makes sense to me.
yield from
The .raw
semantics are isomorphic to yield from
, I guess. (.raw
is a terrible name, we should come up with a better one. But the thing my gist called .raw
.)
Here I've implemented "suppressing BrokenResourceError explicitly on the call to send() in adapter". It seems to work. Please correct I've missed something.
def producer(wrapped):
@asynccontextmanager
@functools.wraps(wrapped)
async def wrapper(*args, **kwargs):
send_channel, receive_channel = trio.open_memory_channel(0)
async with trio.open_nursery() as nursery:
async def adapter():
async with send_channel, aclosing(wrapped(*args, **kwargs)) as agen:
user_exit = False
while not user_exit:
try:
# Advance underlying async generator to next yield
value = await agen.__anext__()
except StopAsyncIteration:
break
while True:
try:
# Forward the yielded value into the send channel
try:
await send_channel.send(value)
except trio.BrokenResourceError:
user_exit = True
break
except BaseException:
# If send_channel.send() raised (e.g. Cancelled),
# throw the raised exception back into the generator,
# and get the next yielded value to forward.
value = await agen.athrow(*sys.exc_info())
nursery.start_soon(adapter, name=wrapped)
async with receive_channel:
yield receive_channel
return wrapper
It doesn't appear to handle ~Cancelled~ aborting the yield
as advertised however:
@producer
async def squares_in_range(low, high):
try:
for i in range(low, high):
with trio.move_on_after(0.5):
yield i ** 2
finally:
print("unwinding")
async def test_producer_cancelled():
async with squares_in_range(0, 50) as squares:
async for _ in squares:
await trio.sleep(1)
RuntimeError: async generator raised StopAsyncIteration
I've fixed the StopAsyncIteration issue based on a hint here: https://github.com/agronholm/asyncio_extras/issues/2
at this point there several references to the async generator adapter approach in this issue-- there really should be a working, tested implementation
I'll make a PR for trio-util and open it to review
trio-util @trio_async_generator
: https://github.com/groove-x/trio-util/pull/9
I am aware of the issues with cleaning up async iterators, PEP 533 and the need for aclosing. However, if a closing is not used, which I overlook frequently, things can really blow up in your face:
gives me:
Note that in addition to the internal trio exceptions, the original cause is not shown at all. This happens when the async generator had a task running in a nursery.
Maybe there is no way to handle this better - I can't tell, this is above my paygrade! But in case there is a way to show a clearer error message, I think it might be worth it.