Open njsmith opened 7 years ago
For reference, here's a minimal Queue
that lets you select on get
. The design would also support selecting on put
with some small tweaks. Possibly useful for extracting more general patterns out of? I believe that the design there is actually fair and could be extended to implement the full Queue API, and would even be faster than what we currently have. (It's based on ideas I've been playing with in the context of #272.)
Maybe the key idea I needed to make the code in the previous comment work is: you need to make the actual operation happen synchronously with the reschedule, because:
you have to unregister the task everywhere synchronously with the call to reschedule (not with when the thing actually wakes up), otherwise you might get rescheduled twice. (Cf #315)
if you want to support fairness, then you can't re-register after unregistering
which means that you can't unregister unless the operation is completed
therefore, the operation completion has to happen synchronously with the call to reschedule
.
We've already moved in this direction for Lock
because it makes condition variables simpler, and we'll probably go this way for Queue
too regardless. So maybe it's a good general way to do things? And then there's primitives like Event
and CapacityLimiter
and Semaphore
that basically work this way already because there isn't really anything to do on the wakeup path anyway.
OTOH, there are also operations where you simply can't perform the operation synchronously at reschedule time: Unix socket operations are like this, because it's possible for the select
-equivalent to report a socket ready, but then get EWOULDBLOCK anyway and you just have to wait again. I guess we could support this by changing the abstraction boundary between trio.socket
and trio.hazmat
so that instead of just waiting for a socket to be readable/writable, you could actually pass through an operation for the IOManager to attempt to perform before waking up the task?
Are there other operations like this? IOCP is a whole other issue, but with IOCP it's literally impossible to get exactly-one-of-these semantics no matter what you do, so I guess we can write that off. I don't know that anyone actually cares about the ability to do exactly-one-of-these on sockets anyway. It's stuff like Queue
s where this generally comes up. (Though we probably want accept_nowait
, so at least that might be good to fit into this pattern if this becomes The Way To Do nowait
Functions. On the other other hand, forcing every Listener
class to do this may be too far to go in search of a pointless consistency.)
@jchayat in chat described a use case they have for select
-style semantics: https://gitter.im/python-trio/general?at=59b03698c101bc4e3a987289
More discussion: https://gitter.im/python-trio/general?at=59b07c66b16f26464212fe0c
It sounds like at least in this case, a multi-task strategy is a viable alternative, though @jchayat still feels:
it's much more natural to think about it as 1 sequential task with "select"
There are some fun error cases to handle.
If some wait setup call errors, then you have to go and abort all the previous wait setup calls to unwind
One way a wait setup call can error is if the same task is trying to wait twice on the same event, e.g. trying to simultaneously put two items into the same queue. Or how should this be handled? The annoying thing is that if we just iterate through the nowait
versions first, and one of them succeeds, then we'll never notice that the two actions conflict. OTOH it seems difficult to make this succeed for the wait
case, because who wants to deal with the complications and overhead of using a MultiDict
to keep track of waiting tasks e.g. I guess the intermediate option would be to silently throw one of them away, but making this work with abort logic might be tricky.
Oh, another fun case: if we want to support this for ParkingLot
, and in particular Condition
s, then we need some replacement for the abort_func
reassignment hack that ParkingLot.requeue
currently uses.
Some interesting arguments against select
as a primitive:
Thanks for sharing those links, @njsmith. They have been quite revealing to me. The most insightful was indeed the Select statement considered harmful and the "Two Big Use Cases" idea. So simple, but so "demolishing" at the same time.
Of course, they are an attempt to over-simplify all concurrency problems under some general patterns... but that's exactly where their power resides!
Here's another place where a nowait
might be useful. This time it's on Stream.receive_some
: #536
I guess it'd actually be possible to have a "no wait" mode, that could handle this in a generic way without going full concurrent-ML: basically it would be like cancellation, except that calls to trio.hazmat.checkpoint()
would be allowed to continue. (Well, maybe we'd need something special for wait_*_{readable,writable}
, where you check for readability/writability first before blocking? And for IOCP, we'd want to switch to non-blocking calls instead of regular IOCP calls...) That still seems less scary than trying to add full concurrent-ML select support to abstract interfaces like Stream
and Listener
.
In #586 (the PR for adding channels), we've run into an interesting problem that's actually closely related to this: put_handle.put
blocks waiting for either someone to call get_handle.get
or put_handle.close
, and these need to be resolved atomically.
See: https://github.com/python-trio/trio/pull/586#issuecomment-414039117
It looks like the accept_nowait
issue may be resolved by #636, so that may disappear as a motivating example here.
I've been working on incorporating some of the CML ideas into Trio, representing operations as (synchronous) generator functions that yield twice. (Ten-second overview: the part before the first yield is "attempt", between the first and second yields is "publish", and after the second yield is "unpublish". At the first yield the operation is sent a handle, which it arranges for another task to call .complete()
or .fail()
on while it's suspended at the second yield. It gets resumed with that value or error, or gets closed if the operation is cancelled/etc.) Any operation implemented in this way automatically supports blocking await foo(...)
, nonblocking foo.nowait(...)
, and foo.operation(...)
(which returns a thing that can be wrapped, selected, etc).
So far I've reimplemented ParkingLot.park
, Event.wait
, {Lock,Semaphore,CapacityLimiter}.acquire
, and Condition.wait
on top of this and all their existing tests pass. I haven't yet done memory channel send/receive or socket send/receive/accept but I don't anticipate any problems supporting them. I don't plan to even try to support IOCP or generalized streams.
I'm pretty pleased with the ergonomics of this approach so far in terms of making it easy to write low-level operations that are robust:
yield from
, which nicely answers https://github.com/python-trio/trio/issues/896It's like a friendlier, more composable, less flexible (because you can't refuse a cancellation) wait_task_rescheduled
.
I'm still polishing it, but I'm wondering if this is something that Trio might be interested in having in the core, or whether I should target an external library instead? Having it in the core seems like a substantial force multiplier -- if it's in a library, you can only compose operations in that library or its dependencies, which probably winds up resulting in that library providing its own synchronization primitives that mostly reimplement Trio's core ones. On the other hand, it's a nontrivial chunk of functionality (_core/_operation.py
is 418 lines right now) and demand for select()
hasn't been that strong. Thoughts?
Some examples:
ParkingLot.park():
@_core.atomic_operation
def park(self):
handle = yield
# keep the current task so we can return it from unpark() --
# but none of the _sync ops require it anymore
self._parked[handle] = _core.current_task()
handle.custom_sleep_data = self
try:
yield
finally:
del handle.custom_sleep_data._parked[handle]
Event.wait(): simple delegation
@_core.atomic_operation
def wait(self):
if self._flag:
return
else:
yield from self._lot.park.operation()
Condition.wait(): delegation with extra publish step and async cleanup
@_core.atomic_operation
def wait(self):
task = _core.current_task()
if task is not self._lock._owner:
raise RuntimeError("must hold the lock to wait")
# NOTE: we go to sleep on self._lot, but we'll wake up on
# self._lock._lot. That's all that's required to acquire a Lock.
handle = yield self._lot.park.operation()
self.release()
try:
yield
self._lock._owner = task
except:
handle.add_async_cleanup(self.acquire)
raise
Memory channel send/receive:
class MemoryChannelState:
...
@_core.atomic_operation
def send(self, value):
if not self.open_receive_channels:
raise _core.BrokenResourceError
if self.receive_ops:
assert not self.data
receive_handle = next(iter(self.receive_ops.keys()))
receive_handle.complete(value)
elif len(self.data) < self.max_buffer_size:
self.data.append(value)
else:
send_handle = yield
self.send_ops[send_handle] = value
try:
yield
finally:
del self.send_ops[send_handle]
@_core.atomic_operation
def receive(self):
if self.send_ops:
send_handle, value = next(iter(self.send_ops.items()))
send_handle.complete()
self.data.append(value)
# Fall through
if self.data:
return self.data.popleft()
if not self.open_send_channels:
raise _core.EndOfChannel
receive_handle = yield
self.receive_ops[receive_handle] = None
try:
return (yield)
finally:
del self.receive_ops[receive_handle]
...
# _closed changed from a bool to an Event
class MemorySendChannel:
@_core.atomic_operation
def _fail_when_closed(self):
yield from self._closed.wait.operation()
raise _core.ClosedResourceError
@_core.atomic_operation
def send(self, value):
yield from _core.select.operation(
self._fail_when_closed.operation(),
self._state.send.operation(value),
)
...
class MemoryReceiveChannel:
@_core.atomic_operation
def _fail_when_closed(self):
yield from self._closed.wait.operation()
raise _core.ClosedResourceError
@_core.atomic_operation
def receive(self):
return (
yield from _core.select.operation(
self._fail_when_closed.operation(),
self._state.receive.operation(),
)
)
...
Oh cool!
I'm still polishing it, but I'm wondering if this is something that Trio might be interested in having in the core, or whether I should target an external library instead? Having it in the core seems like a substantial force multiplier -- if it's in a library, you can only compose operations in that library or its dependencies, which probably winds up resulting in that library providing its own synchronization primitives that mostly reimplement Trio's core ones. On the other hand, it's a nontrivial chunk of functionality (
_core/_operation.py
is 418 lines right now) and demand forselect()
hasn't been that strong. Thoughts?
I think this is too complicated a decision to make quickly :-). But having actual code will tell us a lot!
Some things I want to understand better before forming any conclusions:
select
syntax look like? How do you tell which operation completed? What are the semantics for picking which operation if multiple are complete-able?select
really useful? (1) If it were available, would people use it? (2) If they do, then does it make their code better or worse?select
?...Looking at that list, some of those feel like they're going to need to cook for a while before we can draw conclusions. So maybe it makes most sense to put it in a library for now? It's true that if it does turn out to be a big win then having it in the core will act as a force multiplier, but we can probably learn things without that?
Cool, glad this seems interesting! I'm going to keep prototyping it as a PR to Trio because that gives a nice collection of ready-made examples (and tests!) of how it might simplify low-level operations, but I won't be sad if you want to hold off on reviewing/merging for a while/ever. :-)
What does the select syntax look like? How do you tell which operation completed? What are the semantics for picking which operation if multiple are complete-able?
If you want to select between await send_channel.send("foo")
, await receive_channel_1.receive()
, and await receive_channel_2.receive()
, you would write:
result = await trio.hazmat.select(
send_channel.send.operation("foo"),
receive_channel_1.receive.operation(),
receive_channel_2.receive.operation(),
)
result
will be None
(if the send won) or whichever value was received. For "which one was it?":
@trio.hazmat.atomic_operation
def tag_with_self(operation):
return (operation.args[0], (yield from operation))
@trio.hazmat.atomic_operation
def select_with_self_tag(*operations):
tagged = [tag_with_self.operation(operation) for operation in operations]
return (yield from trio.hazmat.select.operation(*tagged))
which_channel, result = await select_with_self_tag(
# ... as above ...
)
Of course select_with_self_tag
could be packaged for general use, or could be select(..., tag="self")
, or whatever. There are a bunch of these combinators one could imagine; I've been focusing on the core functionality for now.
(Should these be in hazmat? Right now I have them in hazmat, because writing combinations of unevaluated operations is a little bit of a departure from trio's traditional worldview. So far select
and the @atomic_operation
decorator are the only two exported names.)
Is select really useful? (1) If it were available, would people use it? (2) If they do, then does it make their code better or worse?
This is probably one of those questions that requires some time to marinate :-) but my intuition here is that select()
will be a useful substitute for some reasonable set of things that currently get implemented as "start some tasks in parallel, cancel the others when the first one finishes; if another one finished too before it gets your cancellation, awkwardly try to stuff that cat back into the bag".
Operations are also useful even without select, because of how the composition mechanisms let you run additional "unpublish" code synchronously with the reschedule/abort.
What do the docs for writing these operations like this look like? How understandable are they? Do they give better ergonomics to implementors than what we have now?
Definitely a major open question! I'll see what I can come up with :-)
How does this affect fairness?
Each operation (including each individual branch of a select()) gets its own completion handle, which can go in the wait queue of your choice. When one of them gets called, the others synchronously remove themselves. I don't understand some of the WFQ discussions well enough to know whether this system would compromise our ability to move in that direction, but it plays totally fine with the current strict-FIFO fairness.
What happens if someone passes the same operation twice to a single call to select?
They both get published (in left-to-right order). Everything is indexed by the completion handle, not the task, and each branch of the select() gets a different completion handle. Whichever one gets completed first will synchronously unpublish the other one. So it's kind of a silly thing to do, the second copy of the operation doesn't add anything, but it's not going to totally blow up.
(All the things I'm calling "operation" are wrappers around the generator function plus its arguments, so there's no issue with calling the same one multiple times. This is needed to support retrying, to preserve BSD socket semantics. Internally there are also "opiters" i.e. generator iterators of operation functions, but those aren't exposed publicly.)
Code is on the operation
branch in my fork (https://github.com/python-trio/trio/compare/master...oremanj:operation). I'd encourage you not to spend too much time staring at trio/_core/_operation.py
there, since it has almost no comments yet, but the examples of how it's used are probably interesting.
The add_async_cleanup
thing is interesting. Does it run in all cases, or just if the operation is cancelled?
I was looking at this article again, and its running example for a novel selectable operation is swap
, which in their implementation involves doing a blocking operation after committing to the operation. Their implementation is safe in practice because they have some abstraction boundaries that mean the other side is definitely also using swap
, so as soon as we send our value we know they'll immediately send back their value, and it doesn't block in practice. But the type system doesn't guarantee it. So there's some question whether it's better to allow async operations here and just tell people to be really careful which ones they use, or to require the code be synchronous so it can't block. https://github.com/agronholm/typeguard`Condition.wait` is a total oddball in general, so I'm not sure how much it should affect our judgement. (Making it non-selectable also seems like an OK outcome.)
I think a closely related issue is how in classic concurrent ML, when you write a select
you also specify some code to run on each possible branch. In simple cases, these are equivalent:
# concurrent ML style
await select({
send_channel.send.operation("foo"): lambda _: print("sent"),
receive_channel.receive.operation(): lambda value: print(f"got: {value}"),
})
# prototype style
branch, value = await select_with_tag({
send_channel.send.operation("foo"): "sent",
receive_channel.receive.operation(): "received",
})
if branch == "sent":
print("sent")
elif branch == "received":
print(f"got: {value}")
But, they're different in a crucial case: in the first one, if you make it select.operation(...)
, then you've created a new atomic operation, that incorporates the print
calls and everything. I think in concurrent ML this is the primary way that you compose new atomic operations?
The yield from
trick is clever, but I wonder if it would be better to pay the syntactic tax to let select
incorporate handlers for each branch, and use that instead. Another possible approach:
select_builder = SelectBuilder()
@select_builder(send_channel.send.operation("foo"))
def handler(_):
print("sent")
@select_builder(receive_channel.receive)
def handler(value):
print(f"got {value}")
new_op = select_builder.finish()
...oh, I see, but then this is pretty awkward when you want to actually wrap this up into a new public API like your MemoryReceiveChannel.receive
. I guess we'd need @wrapped_atomic_operation
or something? Bleh.
another random thought:
await select(
partial(send_channel.send, "foo"), # or trio.withargs or something, maybe
receive_channel.receive,
)
The
add_async_cleanup
thing is interesting. Does it run in all cases, or just if the operation is cancelled?
It always runs if control flow reaches the add_async_cleanup
call. You make the add_async_cleanup
call in the "unpublish" phase of an operation function, though, so you can do it only on success or only on failure if you like. (If it wasn't clear from the examples, the operation result value/error is sent/thrown at the second yield
.) Currently it's not possible to distinguish "the entire select() was cancelled" from "a different branch won" -- they both throw in GeneratorExit. I think that's probably good for composability.
The actual async cleanup functions run, shielded, in the woken task after it gets rescheduled. If there are multiple, the current implementation runs them in parallel in a nursery; maybe this should be one-at-a-time in the same order as the sync cleanups, but then it gets confusing that we run all the sync cleanups before any of the async ones. I added async cleanup mostly to support wait()
. It could also support the swap()
operation you linked, and could probably be shoehorned into supporting things that are "undoable but not immediately cancellable", like receiving a fixed-size message on a buffered stream (you push any excess part back into the buffer). I say "shoehorned" because the implementation would involve a system task and I don't know if there's a use case that's worth it. (I haven't implemented that one and don't have any near-term plans to.)
I don't think async cleanup is too much of an attractive nuisance given that it's kind of awkward to use it (you have to define a separate async function, you can't just write await
in your operation function). We definitely do need to be able to collect and run all the synchronous unpublish parts without checkpointing, so I don't think there's any way to let people write await
in the operation function without an excess of foot-shooting.
I think a closely related issue is how in classic concurrent ML, when you write a
select
you also specify some code to run on each possible branch.
Yep, the two cases (running as part of the operation vs running afterward) definitely carry different semantics and supporting both of them makes sense. One especially notable difference in my implementation is:
print
lambdas) that run as part of the operation effectively are part of the "unpublish" phasecurrent_task()
, etc, which is a bit of a footgun[using e.g.
partial(send_channel.send, foo)
as a selectable]
My first draft did some magic so all of
await send_channel.send(value)
send_channel.send(value).nowait()
await select(send_channel.send(value), receive_channel.receive())
would work. I deemed that too magical and switched to the current version where the way you're planning on using the function gets written before the parens. It could change back though. One wrinkle is: how do you turn partial(send_channel.send, value)
into an operation? If you expect to get something useful from calling it, it's tricky to also maintain the "coroutine was never awaited" warning. I guess you can look inside the partial
object without calling it, but that seems error-prone to me.
Currently I have the post-decorator send_channel.send(value)
directly return a coroutine object for the internal perform_operation(send_channel.send.operation(value))
(it uses def ...: return ...
rather than async def ...: return await ...
). The places that expect an operation know how to detect that they were passed a perform_operation
coroutine object, and if they get one they pull out the operation object from f_locals
and throw an exception "use MemorySendChannel.send.operation(...), not MemorySendChannel.send(...)". Changing that to just use the operation object instead of complaining would be easy, but maybe we don't want to train our users to write some_async_operation(args)
without an await
in front of it ;-)
I guess we could combine these tricks, actually: if you're expecting an operation, and you get a callable instead, try calling it with no arguments; if you get a coroutine object for perform_operation
back, use coro.cr_frame.f_locals["operation"]
as your operation and call coro.close()
to suppress the unawaited warning. Too magic, or just the right amount? There's something that does feel very "trionic" about writing await select(<list of async thunks>)
. On the gripping hand, it could get quite confusing that this doesn't work for any async thunk... more thought needed.
Side note: I discovered that it is much harder than I thought to write an awaitable object using an async function. Edit: nope, I just didn't realize you can call __await__()
directly on a coroutine object. Order has been restored to the universe.
Demand for select
hasn't been that strong because, well, there's no such thing yet, thus we restructure stuff so that we can compose things with tasks instead, thenceforth we don't need select
any more.
However, there are a couple of pieces of code I'd like to translate from Go to Python (because, you know, Python ;-) ) and having select
would make that kind of job a whole lot more straightforward.
The blog post in the OP also mentions Reagents as a generalization of CML. From the comments:
Reagents do generalise CML. The main difference is that CML only allows you to combine operations with select (op1 OR op2) while Reagents also allow you to combine operations into a transaction (op1 AND op2 for independent operations and op1 THEN op2 for dependent ones).
Basically, using this comment's notation, THEN performs op1 and op2 sequentially, passing the output of op1 as the input of op2, while AND performs them in parallel, returning the output of both op1 and op2 in a tuple (which basically corresponds to join patterns). The key is that all these forms of composition can be combined arbitrarily, so you can AND things together and then use the resulting operation as a single unit in a select, or whatever. (More links, in the context of OCaml this time.)
Reagents was originally conceived as a low-level library for lock-free programming:
Reagents are lightweight in that the library analyses the combined operation and then figures out an efficient CAS scheme to execute it. Reagents also include some more low level operations, such as CAS as a reagent.
But I don't know whether that is somehow intrinsic to the whole concept, or if the basic programming model is separable from that. (I'd guess that it would be, but it's just a guess.)
Anyway, I don't know how relevant any of this might be to Trio :), just figured it might be worth a mention. @oremanj mentioned that select can be emulated to some extent using structured concurrency and cancellation, so it makes me wonder if there are any other connections.
@oremanj
One wrinkle is: how do you turn
partial(send_channel.send, value)
into an operation? If you expect to get something useful from calling it, it's tricky to also maintain the "coroutine was never awaited" warning. I guess you can look inside thepartial
object without calling it, but that seems error-prone to me.
Yeah, partial
objects are introspectable so I was imagining partial_obj.func.operation(*partial_obj.args, **partial_obj.kwargs)
. We do something similar when trying to sniff good task names. It would mean we would only support partial
objects, not e.g. the equivalent lambda
, but that doesn't seem worse than only supporting .operation
objects? Partly this depends on how #470 resolves, but if we end up sticking with always using partial
as our standard way to represent unevaluated thunks then using them here would be pretty consistent.
@smurfix
However, there are a couple of pieces of code I'd like to translate from Go to Python (because, you know, Python ;-) ) and having
select
would make that kind of job a whole lot more straightforward.
Well, select
will at the least exist in a library you can use :-)
@glaebhoerl
The blog post in the OP also mentions Reagents as a generalization of CML
Heh, I just stumbled on those through some completely different path yesterday... I actually have Aaron Turon's thesis open, but I haven't wrapped my head around these yet :-).
However, based on first principles, I feel like there must be some pretty substantial limitations. In Trio, you can sorta fake select
by opening a nursery and racing all the operations against each other, and as soon as one finishes cancel the rest. BUT, this misses out on a crucial part of CML's select
semantics: select
guarantees that exactly one of the operations completes, and the rest don't happen at all. With the fake version, there's no way to rule out having two operations complete at the same time. The atomicity guarantee is what makes select
strictly more powerful than other operations we have.
But it also majorly limits what kinds of operations you can select
on: you need a single instant-in-time where you realize that an operation is going to commit, and then at that instant, you need to be able to roll back all the other operations that aren't going to commit.
In general, OR
preserves this kind of atomicity, but THEN
and AND
do not. There may be ways to save it for particular operations – e.g. with AND
, if the left operation becomes eligible to commit first, maybe you can somehow pause it there while waiting to find out if the right operation commits? (But this is user-visible: if you have unbuffered channels and do chan1.receive AND chan2.receive
, it means chan1.send
will block until someone does chan2.send
.) In the case of Reagents, I think the solution is that all of its operations have to be compiled down to a big kCAS? So e.g. I'm pretty sure it's possible with CML to make operations like sock.send(...)
selectable, but I don't see how it could be possible to make them reagents operations.
OTOH, that might still be powerful enough to implement all the operations that we actually want to support select
on...
This article seems relevant here: https://medium.com/@elizarov/deadlocks-in-non-hierarchical-csp-e5910d137cc
In #1411 I was asked to present my uses cases for select()
here. I had two actually:
The first is some kind of message queue/protocol endpoint. Similar to #467, I send requests to different objects and get back responses from them, however there is no unique matching of a response to a request by id. Instead, I send a request for x and get back one of multiple possible responses regarding x, say a confirmation or an error. The initial idea was to have a channel for confirmations and a channel for errors (per object ofc), which would have been a nice api surface also for requests that cause multiple responses.
Now I would need to select()
exactly one message from the channels from which I expect my single response. I must not receive from both channels at once, the second message actually is the response to the next request (and should be returned from the next .receive()
call).
I gather mailboxes like in Erlang would be the best solution for this. For now, I implemented a solution with callbacks, installing "matchers" in a queue which are evaluated in the receiver task until it finds one that can handle the message. Then it removes all belonging to the same "group" from the queue, so that none will be used again, before sending the single message to the respective "communication" task (who installed the matchers) with help of a dedicated event or channel.
A state machine.
Yes, I've read the article series on "Structured Concurrency" linked above, and kinda agree with the author that all state machines can equivalently be expressed with communicating concurrent tasks instead. However, not for all problems this is actually simpler or easier to understand, especially when your domain objects are specified as state machines already.
My particular problem is about getting a message with a count, and then doing that many requests for resources. However, the number of in-flight requests is limited, similar to a CapacityLimiter
but with dynamic token sizes so I had to roll my own solution there. Also, during that process one might get new count messages, and adjust the requests so that the number of responses received after the last count matches that number. (Notice I do not want to cancel in-flight requests, but I might want to cancel waiting for the next available token, but only if I need none any longer).
I arrived at the following solution:
results = []
expected_length = 0
sent_requests = 0
async def do_request(token):
with token:
res = await self.send_request_and_receive_response()
results.append(res)
if len(results) == expected_length:
nursery.cancel_scope.cancel() # stop handle_count_updates
sending = None
async def do_all_requests():
nonlocal sent_requests, sending
if sending:
return
sending = trio.CancelScope()
with sending:
while sent_requests < expected_length:
token = await self.get_request_token()
nursery.start_soon(do_request, token)
sent_requests += 1
sending = None
async def handle_count_updates():
nonlocal sent_requests, expected_length
async for count in self.count_messages:
sent_requests -= (count - expected_length) # 0 + still in flight
expected_length = count
results = []
if sending and sent_requests >= expected_length:
sending.cancel()
else:
nursery.start_soon(do_all_requests)
async with trio.open_nursery() as nursery:
nursery.start_soon(handle_count_updates)
return results
This basically has the state components (sent_requests
, expected_length
) spelled out, then for each possible event there's a separate receive loop in a separate task, needing to use nonlocal
to manipulate the state. In contrast, with SELECT
this seems so much simpler (and also shorter):
results = []
expected_length = 0
sent_requests = 0
async def do_request(token):
with token:
res = await self.send_request_and_receive_response()
results.append(res)
if len(results) == expected_length:
nursery.cancel_scope.cancel() # stop receiving count_messages
async with trio.open_nursery() as nursery:
waiting_token = NEVER()
while True:
token, count = await SELECT(waiting_token, self.count_messages.receive())
if token is not None:
nursery.start_soon(do_request, token)
sent_requests += 1
waiting_token = self.get_request_token() if sent_requests < expected_length else NEVER()
if count is not None:
sent_requests -= (count - expected_length) # 0 + still in flight
expected_length = count
results = []
return results
(General feedback welcome as well)
So from @wingo's blog I have learned about Concurrent ML. (His concurrency tag is also worth perusing generally.)
The core insight, IIUC, is to take the atomic blocking operations – roughly, the same ones our current cancellation guarantee applies to – and systematically split them into a few consistent sub-operations:
We already implicitly have this basic structure open-coded in a bunch of places, e.g. the
_try_sync
helper intrio/socket.py
, the classes intrio/_sync.py
, etc. Pretty much anywhere you see theyield_if_cancelled
/yield_briefly_no_cancel
pair in trio fits this general pattern, and "unpublish" is basically just our abort callback. So the advantages of reifying this would partly be just to simplify the code by having a single implementation of the overall pattern that we could slot things into – but even more, so because given the above pieces, you can create generic implementations of three variants:*_nowait
variants that we currently implement in an ad hoc way)select
, where you can say "perform exactly one of the following operations: ..."(The first is done by just calling the "try this" sub-operation; the second you do by trying and then blocking if you fail, in a loop, with the unpublish operation as the abort callback; the third is done by calling a bunch of "try this" sub-operations and if one succeeds you're done, otherwise publish all the operations and go to sleep. There's some subtleties around knowing which operation woke you up, and when unpublish happens, etc., but that's the basic idea.)
Right now we have a bunch of manual implementations of
await x()
/x_nowait()
primitives. It's not clear that we have enough, either; #14 has an example of a case where you semantically needaccept_nowait
, and for HTTP client connection pooling when you pull a connection out of the pool you need something likereceive_some_nowait
to check whether the server has closed it before you try to use it.Also, a golang-style
select
is potentially quite useful but isn't possible right now (at least for trio's built-in operations, of course you certainly could build a golang-style channels library on top, but thenselect
would only work on that library's operations). You can spawn some child tasks to try doing all the things concurrently, but there's no way to make sure that no more than one complete – for that you would need to be able to (1) guarantee that all of the operations really are cleanly cancellable, and (2) perform the cancellation synchronously with the first operation completing, which isn't possible if the last thing it does after committing its work is to callyield_briefly_no_cancel
.An ancillary benefit is that if we expose these things as a standard interface to users, then this would also serve as very clear documentation of which the actual atomic cancellable operations are.
But, there are also some issues, I think:
IOCP: the above pattern works for BSD-style non-blocking socket operations, but not for Windows IOCP operations (#52). You can implement cancellable blocking operations as IOCP calls (that's basically what they are), and nowait operations using Windows' non-blocking calls, but IOCP has no generic equivalent to epoll to implement wake-me-when-this-non-blocking-call-might-succed, which means that golang-
select
is not possible. All you can do is ask the kernel to start initiating all the operations, and then by the time you find out that, say, yourrecv
has finished, then yoursend
might also have finished. I guess this might be possible to work around: forsend
I'm pretty sure we can treat IOCP as a kind of epoll, and then use non-blocking send. Forrecv
I'm not sure if this works and foraccept
I'm pretty sure it doesn't, but for these operations you can more-or-less fake them as being always cancellable by using a little user-space buffer to push back any result that you want to pretend didn't happen. Are there any other cases? Do we need to change the pattern to accomodate this? The pushback trick doesn't seem compatible with a strict separation between "try it" and "wake me when you want to try again" primitives.The main problem with HTTP connection pooling isn't checking if the socket is readable – we can already do that in the specific case of a raw socket. It's that it isn't something you can reasonably abstract in terms of generic "streams". In particular, if you have an TLS-wrapped socket, then you actually don't want to check if the TLS layer has data available, you really do want to check the raw socket underneath. And in any case I'm not sure that this would help make it easier to implement
receive_some_nowait
as a generic feature on streams, because receiving on a TLS stream is a very complex operation that may require things like lock acquisition, and all of the operations above have to be synchronous. So maybe the HTTP connection pool case isn't really a good motivator anyway.Lock.acquire
andLock.acquire_nowait
are tricky because of fairness (#54); it's not the case thatacquire
is just a loop likewhile acquire_nowait fails: sleep until it might succeed
, because that creates a race on wakeup. I don't think it's possible to implement a fair mutex using just the framework described above. The problem is that we really need the two operations to be "try it" and "block until the operation is done"; a retry loop just doesn't work. So maybe this is actually equivalent to the IOCP case? Maybe we need primitives:I think this is flexible enough to implement fair synchronization primitives and handle all the public operations. E.g. for golang-
select
we would want to arrange so that when one operation gets rescheduled, then we immediately abort all the other operations, before waiting to actually be woken up – this would need to happen from the context of the task that's handing off the mutex (for example).But... this isn't quite right for stuff like non-blocking socket operations, where you actually need a try-sleep-retry-sleep-retry-... loop. Need to think some more about this.