Open dhirschfeld opened 6 years ago
I'm on win64 running:
python 3.6.5
trio 0.4.0
trio_asyncio 0.7.0
That check was removed in: https://github.com/python-trio/trio/pull/610
There might be other problems, but I suspect the immediate roadblock here is gone...
So @dhirschfeld were there other problems? (too lazy to try this myself)
Sorry, haven't circled back to this just yet. It's on my TODO list but a pretty long way down at the moment.
...so, I got curious if this would Just Work now, in particular after #66. Unfortunately I'm running into another error. I first needed to patch the function to pass in the loop
:
async def f():
async with trio_asyncio.open_loop() as loop:
client = await loop.run_asyncio(partial(Client, loop=loop, asynchronous=True))
future = client.submit(lambda x: x + 1, 10)
return await loop.run_future(future)
...but that then resulted in the below error:
In [3]: trio.run(f)
Traceback (most recent call last):
File "<ipython-input-3-03c750b89f65>", line 1, in <module>
trio.run(f)
File "C:\Users\dhirschf\envs\dev\lib\site-packages\trio\_core\_run.py", line 1804, in run
raise runner.main_task_outcome.error
File "<ipython-input-2-27c49da4404c>", line 3, in f
client = await loop.run_asyncio(partial(Client, loop=loop, asynchronous=True))
File "c:\users\dhirschf\code\github\python-trio\trio-asyncio\trio_asyncio\adapter.py", line 82, in __await__
f = f(*self.args)
File "C:\Users\dhirschf\envs\dev\lib\site-packages\distributed\client.py", line 726, in __init__
self.start(timeout=timeout)
File "C:\Users\dhirschf\envs\dev\lib\site-packages\distributed\client.py", line 891, in start
sync(self.loop, self._start, **kwargs)
File "C:\Users\dhirschf\envs\dev\lib\site-packages\distributed\utils.py", line 336, in sync
loop.add_callback(f)
AttributeError: 'TrioEventLoop' object has no attribute 'add_callback'
Does that mean that dask.distributed
fundamentally won't work with trio-asyncio
or just that trio-asyncio
is missing some functionality which would be easy/moderately hard/hard to implement?
There is no add_callback
function in asyncio, so no wonder trio_asyncio
doesn't have one either.
Thx @smurfix - sounds like I'll have to take it up with distributed
...
Maybe you need to somehow wrap the asyncio loop object in a tornado loop object, and pass that to distributed?
On Fri, Feb 14, 2020, 04:52 Dave Hirschfeld notifications@github.com wrote:
Thx @smurfix https://github.com/smurfix - sounds like I'll have to take it up with distributed...
— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/python-trio/trio-asyncio/issues/22?email_source=notifications&email_token=AAEU42CF77W7RGWK5S7N7G3RC2HX7A5CNFSM4FH2JAYKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOELY5THQ#issuecomment-586275230, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAEU42G2X3RTBCLNVH2NQK3RC2HX7ANCNFSM4FH2JAYA .
Thanks for the suggestion @njsmith - I'll give that a go!
Just for informational purposes (in case anyone is interested), explicitly passing the loop was unnecessary but I run into a problem with different loops being used:
async def f():
async with trio_asyncio.open_loop() as loop:
print(id(loop))
print(id(IOLoop.current().asyncio_loop))
client = await loop.run_asyncio(partial(Client, loop=IOLoop.current(), asynchronous=True))
future = client.submit(lambda x: x + 1, 10)
return await loop.run_future(future)
In [25]: await f()
1656317099400
1656317099400
Traceback (most recent call last):
File "<ipython-input-25-9fe41d264da1>", line 4, in async-def-wrapper
File "<ipython-input-23-d02282e26f84>", line 5, in f
client = await loop.run_asyncio(partial(Client, loop=IOLoop.current(), asynchronous=True))
File "~\code\github\python-trio\trio-asyncio\trio_asyncio\base.py", line 231, in run_aio_coroutine
return await run_aio_future(coro)
File "~\code\github\python-trio\trio-asyncio\trio_asyncio\util.py", line 39, in run_aio_future
res = await trio.hazmat.wait_task_rescheduled(abort_cb)
File "~\envs\dev\lib\site-packages\trio\_core\_traps.py", line 165, in wait_task_rescheduled
return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
File "~\envs\dev\lib\site-packages\outcome\_sync.py", line 111, in unwrap
raise captured_error
File "~\envs\dev\lib\asyncio\tasks.py", line 630, in _wrap_awaitable
return (yield from awaitable.__await__())
File "~\envs\dev\lib\site-packages\distributed\client.py", line 957, in _start
**self._startup_kwargs
File "~\envs\dev\lib\site-packages\distributed\deploy\spec.py", line 365, in _
await self._start()
File "~\envs\dev\lib\site-packages\distributed\deploy\spec.py", line 290, in _start
await super()._start()
File "~\envs\dev\lib\site-packages\distributed\deploy\cluster.py", line 57, in _start
comm = await self.scheduler_comm.live_comm()
File "~\envs\dev\lib\site-packages\distributed\core.py", line 646, in live_comm
connection_args=self.connection_args,
File "~\envs\dev\lib\site-packages\distributed\comm\core.py", line 214, in connect
future, timeout=min(deadline - time(), 1)
File "~\envs\dev\lib\asyncio\tasks.py", line 442, in wait_for
return fut.result()
File "~\envs\dev\lib\site-packages\distributed\comm\tcp.py", line 349, in connect
ip, port, max_buffer_size=MAX_BUFFER_SIZE, **kwargs
File "~\envs\dev\lib\site-packages\tornado\tcpclient.py", line 270, in connect
addrinfo = await self.resolver.resolve(host, port, af)
RuntimeError:
Task <Task pending coro=<BaseTCPConnector.connect() running at ~\envs\dev\lib\site-packages\distributed\comm\tcp.py:349>
cb=[_release_waiter(<Future pendi...1A4D379D8>()]>)() at ~\envs\dev\lib\asyncio\tasks.py:392]>
got Future <Future pending> attached to a different loop
Removing IPython from the equation (by running it as a script) results in a different error:
In [3]: from dask.distributed import LocalCluster
In [5]: cluster = LocalCluster(n_workers=1, threads_per_worker=2)
In [6]: cluster.scheduler.address
Out[6]: 'tcp://127.0.0.1:60590'
import asyncio
from functools import partial
from tornado.ioloop import IOLoop
import trio
import trio_asyncio
from dask.distributed import Client
address = 'tcp://127.0.0.1:60590'
async def f():
async with trio_asyncio.open_loop() as loop:
client = await loop.run_asyncio(partial(Client, address, asynchronous=True))
future = client.submit(lambda x: x + 1, 10)
return await loop.run_future(future)
res = trio_asyncio.run(f)
print(res)
(dev) ~\code\sandbox> python .\test-trio-dask.py
distributed.client - ERROR - Error in callback <function run_aio_future.<locals>.done_cb at 0x0000019721A32DC8> of <Future: finished, type: builtins.int, key: lambda-845252994edca236a111f8f502902dff>:
Traceback (most recent call last):
File "~\envs\dev\lib\site-packages\trio\_core\_generated_run.py", line 96, in reschedule
return GLOBAL_RUN_CONTEXT.runner.reschedule(task, next_send)
AttributeError: '_thread._local' object has no attribute 'runner'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "~\envs\dev\lib\site-packages\distributed\client.py", line 287, in execute_callback
fn(fut)
File "~\code\github\python-trio\trio-asyncio\trio_asyncio\util.py", line 25, in done_cb
trio.hazmat.reschedule(task, outcome.capture(future.result))
File "~\envs\dev\lib\site-packages\trio\_core\_generated_run.py", line 98, in reschedule
raise RuntimeError('must be called from async context')
RuntimeError: must be called from async context
Unless the traceback highlights some glaringly obvious bug in trio-asyncio
I'm leaning towards probably not possible (at this point in time)
Ugh. I'm not running any sort of dask server, so when I thy I get this:
OSError: Timed out trying to connect to 'tcp://127.0.0.1:60590' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x7f7828df7e80>: ConnectionRefusedError: [Errno 111] Connection refused
This error message does not make sense, either the connection was refused (which implies that the server rejected us) or it timed out (which implies that the server didn't react at all). I filed a bug at https://github.com/dask/distributed/issues/3487
Anyway, if I needed a dask client I would tend to fork the thing and apply a global s/asyncio/trio/g – but as I would like to understand why this doesn't work, please tell me how you started the server part.
Sorry - I should've been clearer above.
Dask has a LocalCluster
class which spins up a scheduler and worker processes. The first code block:
from dask.distributed import LocalCluster
cluster = LocalCluster(n_workers=1, threads_per_worker=2)
print(cluster.scheduler.address)
was run by me in a separate python session to start a cluster and get the address.
I did it in a separate process to avoid any contamination of the testing code. I've seen LocalCluster
cause some issues (even if only mucking with stderr/stdout) in an IPython session. It's not recommended for production but is useful for testing.
Anyway, having started the dask cluster you would then need to change the address
variable in the trio test script to connect to the scheduler you spun up externally.
- I would tend to fork the thing and apply a global s/asyncio/trio/g*
Yeah, I'm at the point of wondering if I need to start a dask-trio
project! There would no doubt be a lot of benefits to it but it might be a lot of work :/ Whilst dask itself is a pretty complex project I'm hopeful that the client side is comparatively straightforward with all the complexity being in the scheduler. I haven't yet looked into that
At least seems potentially feasible:
Client: something that users use to submit tasks to the scheduler. Would need to be rewritten but is fairly simple. Needs to know how to serialize functions, encode msgpack, and send data over a socket.
OK, I've looked at the code a bit, particularly that of Tornado.
Bottom line: I do think it's possible to teach Tornado to be compatible with trio-asyncio, but it would result in something rather fragile at best. (Its multi-loop support doesn't help. At all. Neither does its Python 2.7 compatibility …)
If you don't want to write a dask-trio
client, another way forward would be to hack up a tornado-trio
module, i.e. take Tornado's IOLoop
and hack it until it talks to trio instead of asyncio, taking out two layers of indirection. That should be straightforward, if a bit of work, though you'd probably have to monkeypatch the static methods in tornado.ioloop.IOLoop
to impersonate your new class instead.
My preferred solution would be dask-anyio
.
Thanks for taking a look @smurfix, and for the helpful suggestions!
My preferred solution would be
dask-anyio
.
That is sounding more and more like the way to go...
Moved from https://github.com/python-trio/trio/issues/552#issuecomment-401646008
results in the error: