dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 719 forks source link

Errors when calling `get_client` inside `client.sync`. #8914

Closed trivialfis closed 4 weeks ago

trivialfis commented 4 weeks ago

Describe the issue:

Two unexpected errors when calling get_client inside a function called by client.sync:

We use the client.sync method to handle asynchronous and synchronous environments uniformly in XGBoost. I'm not sure if this is the best way to write libraries, please share if there's a better approach.

Error message:

asynchronous: False
test_sync_call/asynchronous: True
/home/jiamingy/.anaconda/envs/xgboost_dev_125/lib/python3.11/site-packages/distributed/client.py:1730: RuntimeWarning: coroutine 'wait_for' was never awaited
  self.close()
/home/jiamingy/.anaconda/envs/xgboost_dev_125/lib/python3.11/site-packages/distributed/client.py:1730: RuntimeWarning: coroutine 'Client._close' was never awaited
  self.close()
Traceback (most recent call last):
  File "/home/jiamingy/workspace/xgboost_dev/XGBoostUtils/dask-issues/async_client.py", line 23, in <module>
    main(client)
  File "/home/jiamingy/workspace/xgboost_dev/XGBoostUtils/dask-issues/async_client.py", line 16, in main
    client.sync(test_sync_call)
  File "/home/jiamingy/.anaconda/envs/xgboost_dev_125/lib/python3.11/site-packages/distributed/utils.py", line 364, in sync
    return sync(
           ^^^^^
  File "/home/jiamingy/.anaconda/envs/xgboost_dev_125/lib/python3.11/site-packages/distributed/utils.py", line 440, in sync
    raise error
  File "/home/jiamingy/.anaconda/envs/xgboost_dev_125/lib/python3.11/site-packages/distributed/utils.py", line 413, in f
    future = asyncio.ensure_future(awaitable)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jiamingy/.anaconda/envs/xgboost_dev_125/lib/python3.11/asyncio/tasks.py", line 659, in ensure_future
    return _ensure_future(coro_or_future, loop=loop)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jiamingy/.anaconda/envs/xgboost_dev_125/lib/python3.11/asyncio/tasks.py", line 674, in _ensure_future
    raise TypeError('An asyncio.Future, a coroutine or an awaitable '
TypeError: An asyncio.Future, a coroutine or an awaitable is required

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/jiamingy/workspace/xgboost_dev/XGBoostUtils/dask-issues/async_client.py", line 22, in <module>
    with Client(cluster) as client:
  File "/home/jiamingy/.anaconda/envs/xgboost_dev_125/lib/python3.11/site-packages/distributed/client.py", line 1720, in __exit__
    _current_client.reset(self._previous_as_current)
RuntimeError: <Token used var=<ContextVar name='_current_client' default=None at 0x7403fdace020> at 0x7403b6e27c80> has already been used once

Minimal Complete Verifiable Example:

from distributed import LocalCluster, Client, get_client

def test_sync_call():
    with get_client() as client:
        print("test_sync_call/asynchronous:", client.asynchronous)

def main(client: Client) -> None:
    print("asynchronous:", client.asynchronous)
    client.sync(test_sync_call)

if __name__ == "__main__":
    with LocalCluster(n_workers=7, threads_per_worker=4) as cluster:
        with Client(cluster) as client:
            main(client)

Environment:

pentschev commented 4 weeks ago

What happens here is that calling .sync() always assumes the function returns an awaitable, which is obviously not the case if the function is synchronous as in the original post. However, the docstring doesn't really define whether this is intentional or not, but the sync function which is called internally clarifies that func must be a coroutine. With that said, it seems to me that this is indeed intentional, everything that is called with sync() should return an awaitable, all existing tests assume that too. I'll not claim I know the reasoning behind that decision, but I suspect it is indeed intentional because without this it leaves the door open for users to write blocking code that may never allow the event loop to continue progressing and thus block the application indefinitely.

I think it's probably important to determine whether your function can be made asynchronous, and if not let us know the constraints here. Maybe Distributed maintainers who are more familiar with the internals would also have further comments on the sync() design and constraints, or suggestions how to move forward.

trivialfis commented 4 weeks ago

@pentschev Thank you for the explanation and the references, that makes sense.

I think it's probably important to determine whether your function can be made asynchronous

We have been doing that. I was trying to simplify the code in XGBoost, and the async seems difficult to handle when we also need to handle synchronous code.

It's fine if this is intended behavior. I will close this issue now.