Open goodboy opened 5 years ago
This is a really interesting idea. I'm not totally sure what to make of it, but it's very interesting. It seems like it would enable some kind of reactive-programming model across machines, which is a trippy thought. Like CycleJS for distributed systems. I'm not sure what the target application would be for such a system, because I'm remarkably uncreative.
The more I think about this the more I think we should be adopting the multi-task style considerations being discussed in the following trio
related issues:
The entire "channels" related issue list is also worth keeping and eye on: https://github.com/python-trio/trio/issues?q=is%3Aopen+channels
We already have this api to run an async func and have it stream but currently there's no way to have the caller send values to the calee. In essence we want the same streaming behavior as if you had to actors call each other and receive stream results but in a more compact single-context-channel thing API.
The thought i had was something like the following based on our context api:
@tractor.stream
async def streamer(
ctx: tractor.Context,
) -> None:
"""A simple web response streaming server.
"""
tx, rx = context.attach_stream()
while True:
url = await rx.recv()
val = await web_request(url)
# this is the same as ``yield`` in the async gen case
await tx.send(val)
async def caller(urls: List[str]):
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(streamer)
tx, rx = portal.attach_stream()
for url in urls:
await tx.send(url)
resp = await rx.recv()
I guess this would give us a more mem chan style "select statement considered harmful" style api?
We can then have per-side shutdown signalling just like trio
's memory channels?
I dunno, what do the lurkers think.
Ok so thought some more about this designing some fairly involved persistent service system stuff and I think we probably want a bit more stringency in this api.
second actor: the sub
@tractor.context
async def streamer(
ctx: tractor.Context,
*args,
) -> None:
"""A simple web response streaming server.
"""
# do setup stuff
# signal to calling actor that we are synced and ready
# this value to `.started()` i returned from calling side's ``.__aenter__()``
await ctx.started(first_message_relayed_back_to_caller)
# this will error if we haven't yet called ``.started()`` above
# this entry blocks until the other side also opens a stream
async with context.open_stream() as send, recv:
while True:
url = await recv() # should this be clonable?
val = await web_request(url)
# this is the same as ``yield`` in the async gen case
await send(val)
first actor: the parent / caller
async def caller(urls: List[str]):
async with tractor.open_nursery() as n:
portal = n.start_actor('streamer')
# `.open_context()` here returns some kind of caller end `Context`
ctx = await portal.open_context(streamer)
async with ctx, ctx.open_stream() as (first_msg_from_started_call, (send, recv)):
for url in urls:
await send(url)
resp = await recv() # could be clonable?
The nice part of enforcing a context manager style "session" on the calling side is that we guarantee setup/teardown sync stages with the remote task. In particular, we can raise appropriate errors if for whatever reason either end has "terminated" their block without having to wrangle it through channel / connection state and messages.
Obviously this api also allows for just calling a function that acts like a context manager but doesn't have to initialize any streaming. In that case I'm not sure if the caller should also be required to async with context.open_stream()
? In theory the Context
type can serve async context manager protocol methods and then just be used as a cross-actor cancel scope like we mentioned in #122?
Yah so the main question is how many nested async with
s you gotta do:
ctx = await portal.open_context(stream)
async with ctx, ctx.open_stream() as send, recv:
# start streaming
Might be the more explicit correct api since it distinguishes the pre-started sync point with the callee versus it being implied by with
-ing .open_stream()
?
Alternatively the stream could be opened without using async with
and then provide objects for send
, recv
which support it much like trio
' s mem chans?
Interesting simple example from arq
:
import asyncio
from aiohttp import ClientSession
from arq import create_pool
from arq.connections import RedisSettings
async def download_content(ctx, url):
session: ClientSession = ctx['session']
async with session.get(url) as response:
content = await response.text()
print(f'{url}: {content:.80}...')
return len(content)
async def startup(ctx):
ctx['session'] = ClientSession()
async def shutdown(ctx):
await ctx['session'].close()
async def main():
redis = await create_pool(RedisSettings())
for url in ('https://facebook.com', 'https://microsoft.com', 'https://github.com'):
await redis.enqueue_job('download_content', url)
# WorkerSettings defines the settings to use when creating the work,
# it's used by the arq cli
class WorkerSettings:
functions = [download_content]
on_startup = startup
on_shutdown = shutdown
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Basically what i've proposed but with setup/teardown inside the target func.
We should probably mock this up exactly using httpx
.
Alternatively the stream could be opened without using async with and then provide objects for send, recv which support it much like trio' s mem chans?
Yeah I'm starting to think this might be the way to go and we should basically just build-out a send-side equivalent of our existing ReceiveStream
type and then basically just offer the same interface as trio
's send chan.
The last thing I have questions about is whether we need to support individual send vs. recv stream cloning.
Cloning on a recv side stream makes obvious sense since it'll copy messages/packets to each consumer receiving from that stream (versus only one task getting the value and the others missing it) but with a send side I'm not so sure it's useful given the nature of an IPC oriented "channel".
The following comes from some rubber ducking on the trio
channel.
The many to many example in the trio
docs makes clear why having an implicit task semaphore built-in to the memory channel is handy since it avoids having to use more then one nursery to make sure bookkeeping is done correctly by the user.
We could avoid this by using some complicated bookkeeping to make sure that only the last producer and the last consumer close their channel endpoints… but that would be tiresome and fragile.
This is, kinda true. It definitely requires more stringent task bookkeeping since you need one set of tasks (consumers) to be distinctly separate from another sec (producers) in terms of closing each set's end of the channel. But, arguably you lose some supervision control; if you're interested in knowing which task set was the source of a failure you can't really.
For example if you used the following code you don't really need cloning but it requires 2 extra nurseries and distinctly concurrent task set allocations such that either set can terminate and signal closing of their side of the channel without interfering with the other or requiring any task ordering or channel bookkeeping whatsoever:
async def sender(tx):
async with tx:
async with trio.open_nursery() as t_n:
t_n.start_soon(send_stuff_with_tx, tx)
tx, rx = trio.open_memory_channel()
async with trio.open_nursery() as root_n:
root_n.start_soon(sender, tx)
async with rx:
async with trio.open_nursery() as r_n:
r_n.start_soon(recv_stuff_with_rx, rx)
Following this example in detail you'll see it's the same result as with using .clone()
on each side - all consumers and producers appropriately close off and signal to the other end when complete due to the sub-task tree arrangement.
Thanks to with
statement nesting this can be further simplified to:
async def sender(tx, task_status):
async with tx, trio.open_nursery() as (_, t_n):
t_n.start_soon(send_stuff_with_tx, tx)
tx, rx = trio.open_memory_channel()
async with trio.open_nursery() as root_n:
root_n.start_soon(send, tx)
async with rx, trio.open_nursery() as (_, r_n):
r_n.start_soon(recv_stuff_with_rx, rx)
which, to me, is definitely no more fragile, and I'm not convinced it's that much less simple:
tx, rx = trio.open_memory_channel()
async with trio.open_nursery() as n:
n.start_soon(recv_stuff_with_rx, rx.clone())
n.start_soon(send_stuff_with_tx, tx.clone())
but obviously now the async with tx
/ rx
is embedded in the task funcs which means you actually have no less code taking the .clone()
approach since you'll have the 2 async with tx
blocks in each task function which is used to signal the close of each individual clone.
Yes, you have less nurseries and a flat task hierarchy but you also lose lifetime control / error handling on each set individually. For example if one of the producer tasks fails for a reason that can be handled, in the flat, single nursery case, both sets must be restarted versus possibly recovering one or the other side with supervision code. So afaict from a LOC perspective it's not really different minus the await root_n.start(send, tx)
call to spawn the task to spawn the second task set. Again, yes it's more nurseries, but we like nurseries and with more nurseries comes more supervision controls.
Further once we start thinking about multiple-actors (processes) where the send vs. recv side of the channel API are actually in completely separate memory spaces I'm not sure the argument for less nurseries applies at all actually. If you have tasks in 2 separate actors they already are running under distinct actor scoped nurseries and there should be no way to have a single-direction-side (meaning the send or recv side attached in separate processes) be oriented in a way such that you can "close one side" without also closing some top level task or nursery.
Let's take the one example from above:
# code which will be mostly duplicate in both processes
async with context.open_stream() as send, recv:
# yes you could spawn a bunch of tasks which send on clones
# but closing all these will teardown only one side of the 2-way stream
async with recv, trio.open_nursery() as (_, n):
# the main question is, when will you need it such that the above nursery
# **does not also close** when all these tasks are complete?
n.start_soon(send_stuff_with_tx, send.clone())
# remember this is different because each task will receive **a copy of each message**
# every time something new is sent from the other side
n.start_soon(send_stuff_with_rx, recv.clone())
The only case I can think of to justify send.clone()
semantics is if want:
But, when or why would you need that?
Thanks to @nimaje for further examples that might provide further incentive for send side cloning.
Quoting from trio
gitter:
async def run_pair(send_channel, …):
async def partner(…):
…
try:
while some_condition:
…
await send_channel.send(something)
…
if some_other_condition:
await send_channel.aclose()
except trio.ClosedResourceError:
# stop that stuff when the channel was closed
pass
…
try:
# maybe send something if possible
await send_channel.send(something)
except trio.ClosedResourceError:
pass
…
async with trio.open_nursery() as nursery:
nursery.start_soon(partner, …)
nursery.start_soon(partner, …)
send_channel = …
with trio.open_nursery() as nursery:
# each of the pairs should independently stop using the channel
# without closing them for the others
nursery.start_soon(run_pair, send_channel.clone(), …)
nursery.start_soon(run_pair, send_channel.clone(), …)
nursery.start_soon(run_pair, send_channel.clone(), …)
if you don't have .clone() with seperate closing you would need to build some communication between the partner tasks yourself and what if you want to allow closing the channel for those partners from outside?
and the other case
A long running task allocated by the same nursery that creates tasks which will use and eventually close the channel when complete.
# maybe every case of that is solveable with a second nursery
send_channel = …
async with send_channel, trio.open_nursery() as nursery:
# scenario:
# to signal the reciving side send_channel should be closed as soon as possible,
# but some_long_running_task will run longer than that in most cases
# (you can't swap send_channel and nursery as that would close the send_channel directly after this body)
nursery.start_soon(some_task, send_channel, …)
nursery.start_soon(some_other_task, send_channel, …)
nursery.start_soon(some_long_running_task, …)
ah, here the variant with .clone() (the tasks would have their own async with send_channel:
send_channel = …
async with trio.open_nursery() as nursery, send_channel:
nursery.start_soon(some_task, send_channel.clone(), …)
nursery.start_soon(some_other_task, send_channel.clone(), …)
nursery.start_soon(some_long_running_task, …)
So I guess I'll try to summarize these 2 examples above:
await send_channel.aclose()
once terminated and you don't want any one task to destruct the channel before sibling tasks are complete
I've already left some notes in the code about how we could do two way streaming using the native
received = yield sent
generator semantics but it's probably worth looking at how other projects are approaching it and if it's even a good idea.Some projects I found through different communities recently:
faust
(though I think this is moreso a demonstration of a need for a properasyncitertools
)Another question is how to accomplish this with traditional messaging patterns like those found in
nng
. There was some suggestions in gitter about combining protocols/socket types.More to come...
Update
The api I'm currently most convinced of is at this comment. Afaict it would also suffice the needs of #122 to some degree since cancelling a "context" would effectively be like cancelling a cross-actor scope.