dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

No event loop in thread 'Dask-Default-Threads' #7387

Closed zar3bski closed 1 year ago

zar3bski commented 1 year ago

Describe the issue: I am trying to integrate some legacy code into dask.distributed. It involves to instantiate aiohttp.ClientSession on each worker only once to get multiple urls. ClientSession are not easily serializable so I tried to implement a actor pattern to instantiate my Extractor on the workers. In my local / dev context (I haven't tried on a real cluster yet), I am facing

Traceback (most recent call last):
  File "/home/zar3bski/Documents/Code/octaave/pocs/poc-dask/poc_dask/mcve.py", line 73, in <module>
    asyncio.run(main())
  File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
    return future.result()
  File "/home/zar3bski/Documents/Code/octaave/pocs/poc-dask/poc_dask/mcve.py", line 58, in main
    remote_extractor = await client.submit(
  File "/home/zar3bski/.cache/pypoetry/virtualenvs/poc-dask-iG-N0GH5-py3.10/lib/python3.10/site-packages/distributed/client.py", line 293, in _result
    raise exc.with_traceback(tb)
  File "/home/zar3bski/Documents/Code/octaave/pocs/poc-dask/poc_dask/mcve.py", line 26, in __init__
    self.session = connector.session_factory()
  File "/home/zar3bski/Documents/Code/octaave/pocs/poc-dask/poc_dask/mcve.py", line 18, in session_factory
    session = ClientSession(
  File "/home/zar3bski/.cache/pypoetry/virtualenvs/poc-dask-iG-N0GH5-py3.10/lib/python3.10/site-packages/aiohttp/client.py", line 228, in __init__
    loop = get_running_loop(loop)
  File "/home/zar3bski/.cache/pypoetry/virtualenvs/poc-dask-iG-N0GH5-py3.10/lib/python3.10/site-packages/aiohttp/helpers.py", line 287, in get_running_loop
    loop = asyncio.get_event_loop()
  File "/usr/lib/python3.10/asyncio/events.py", line 656, in get_event_loop
    raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'Dask-Default-Threads-150225-0'.

I tried several approaches but asyncio's get_running_loop never gets its loop

Minimal Complete Verifiable Example:

from aiohttp import ClientSession
from distributed import Client

class Connector:
    def __init__(self, conf: dict):
        self.conf = conf

    def session_factory(self) -> ClientSession:
        session = ClientSession(
            headers=self.conf.get("headers", {}),
        )
        return session

class Extractor:
    def __init__(self, connector: Connector):
        self.session = connector.session_factory()

    async def job(self, url: str) -> str:
        resp = await self.session.get(url, allow_redirects=True)
        if resp.status == 200:
            data = await resp.read()
            return str(data)

    async def close_session(self):
        await self.session.close()

URLS = ["https://ambassador21.bandcamp.com/album/human-rage-2cd-deluxe-edition"]

async def main():
    client = await Client(asynchronous=True)

    # extraction
    remote_extractor = await client.submit(
        Extractor, Connector({"headers": {"SOME": "HEADERS"}}), actor=True
    )
    extractor = await remote_extractor  # Get back a pointer to that object

    futures = client.map(
        extractor.job,
        URLS,
        retries=5,
        pure=False,
    )
    _ = await client.gather(futures)

if __name__ == "__main__":
    asyncio.run(main())

Anything else we need to know?: I wondered whether setting Worker more explicitly could overcome this difficulty but I haven't been lucky with this attempt

client = await Client(
        asynchronous=True,
        address=LocalCluster(
            host="127.0.0.1",
            asynchronous=True,
            n_workers=2,
            processes=True,
            worker_class=Worker(io_loop=loop),
        ),
    )

Environment:

fjetter commented 1 year ago
  1. Your Extractor.__init__ is initializing a session factory. This already requires an event loop and you should ideally be calling this from within an async function. See also https://docs.aiohttp.org/en/stable/faq.html#why-is-creating-a-clientsession-outside-of-an-event-loop-dangerous
  2. I think there is a misconception of what Actors are doing. Assuming your code would run, this would instantiate one Extractor on the cluster, not per worker.

I suggest instead one of two possibilities:

  1. Use a python interpreter global cache to store your session (e.g. global variable or a functools.cache)
  2. Wrap your session in an object that is serializable and submit it as a dask task.
class Connector:
    def __init__(self, conf: dict):
        self.conf = conf
        self._session = None

    def session_factory(self) -> ClientSession:
        if not self._session:
            self._session = ClientSession(
            headers=self.conf.get("headers", {}),
        )
        return self._session

    def __reduce__(self):
        return (Connector.__init__, (self.conf,))

def get_connector():
    return Connector(...)

connector = client.submit(get_connector, conf)

def do_stuff(foo, connector):
    ....

client.map(do_stuff, range(10), connector=connector)

Dask will then replicate Connector to every worker where it will act as a session cache

zar3bski commented 1 year ago

Thanks for your answer, @fjetter. I already explored 2 here: not easy to serialize these custom objects. Is the following implementing 1. properly?

from functools import cache
class Connector:
    def __init__(self, conf: dict):
        self.conf = conf

    @cache
    def session_factory(self) -> ClientSession:
        session = ClientSession(
            headers=self.conf.get("headers", {}),
        )
        return session
    # No changes on Extractor

async def main():
    client = await Client(asynchronous=True)
    extractor = Extractor(Connector({}))

    futures = client.map(
        extractor.job,
        URLS,
        retries=5,
        pure=False,
    )
    _ = await client.gather(futures)
fjetter commented 1 year ago

Sorry for the late reply. I think that should work. I'll close this ticket now but if you need further assistance, please reopen or consider posting a question in https://dask.discourse.group/ which is where we handle most of the usage related questions.