neo4j / neo4j-python-driver

Neo4j Bolt driver for Python
https://neo4j.com/docs/api/python-driver/current/
Other
898 stars 186 forks source link

Async: The future belongs to a different loop than the one specified as the loop argument #868

Closed Anjum48 closed 1 year ago

Anjum48 commented 1 year ago

Bug Report

I am using AsyncGraphDatabase to build a database asynchronously. My code pulls data from many RSS feeds and writes certain components to the database. Each feed can be treated individually, hence the async approach. The feeds are first successfully downloaded (non-async) and put into a list of tasks, so I know it's not some HTTP error.

If I process a small batch of tasks (100), my code usually runs fine - everything runs asynchronously and I'm seeing lots of CPU utilization and the expected speedup. However, when I run on the full list (1500+), I run into this error:

Traceback (most recent call last):
  File "/home/anjum/github/tapesearch/run_feed_updater.py", line 160, in <module>
    asyncio.run(main2(urls))
  File "/home/anjum/opt/python-3.10.8/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/anjum/opt/python-3.10.8/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "/home/anjum/github/tapesearch/run_feed_updater.py", line 128, in main2
    await f
  File "/home/anjum/opt/python-3.10.8/lib/python3.10/asyncio/tasks.py", line 571, in _wait_for_one
    return f.result()  # May raise f.exception().
  File "/home/anjum/github/tapesearch/database/create_db.py", line 235, in process_rss_feed
    await session.execute_write(
  File "/home/anjum/venv/tapesearch/lib/python3.10/site-packages/neo4j/_async/work/session.py", line 704, in execute_write
    return await self._run_transaction(
  File "/home/anjum/venv/tapesearch/lib/python3.10/site-packages/neo4j/_async/work/session.py", line 476, in _run_transaction
    await self._open_transaction(
  File "/home/anjum/venv/tapesearch/lib/python3.10/site-packages/neo4j/_async/work/session.py", line 393, in _open_transaction
    await self._connect(access_mode=access_mode)
  File "/home/anjum/venv/tapesearch/lib/python3.10/site-packages/neo4j/_async/work/session.py", line 121, in _connect
    await super()._connect(access_mode, **access_kwargs)
  File "/home/anjum/venv/tapesearch/lib/python3.10/site-packages/neo4j/_async/work/workspace.py", line 194, in _connect
    self._connection = await self._pool.acquire(**acquire_kwargs_)
  File "/home/anjum/venv/tapesearch/lib/python3.10/site-packages/neo4j/_async/io/_pool.py", line 787, in acquire
    connection = await self._acquire(
  File "/home/anjum/venv/tapesearch/lib/python3.10/site-packages/neo4j/_async/io/_pool.py", line 210, in _acquire
    or not await self.cond.wait(timeout)
  File "/home/anjum/venv/tapesearch/lib/python3.10/site-packages/neo4j/_async_compat/concurrency.py", line 404, in wait
    return await self._wait(timeout=timeout, me=me)
  File "/home/anjum/venv/tapesearch/lib/python3.10/site-packages/neo4j/_async_compat/concurrency.py", line 374, in _wait
    await wait_for(fut, timeout)
  File "/home/anjum/venv/tapesearch/lib/python3.10/site-packages/neo4j/_async_compat/shims/__init__.py", line 88, in wait_for
    fut = asyncio.ensure_future(fut, loop=loop)
  File "/home/anjum/opt/python-3.10.8/lib/python3.10/asyncio/tasks.py", line 615, in ensure_future
    return _ensure_future(coro_or_future, loop=loop)
  File "/home/anjum/opt/python-3.10.8/lib/python3.10/asyncio/tasks.py", line 621, in _ensure_future
    raise ValueError('The future belongs to a different loop than '
ValueError: The future belongs to a different loop than the one specified as the loop argument

My code is laid out like this:

class DBBuilder:
    def __init__(self, db_name="neo4j"):

        self.driver = AsyncGraphDatabase.driver(
            os.getenv("NEO4J_URI"),
            auth=(
                os.getenv("NEO4J_USERNAME"),
                os.getenv("NEO4J_PASSWORD"),
            ),
        )
        self.db_name = db_name

    async def process_rss_feed(self, rss_feed, url):

        async with self.driver.session(database=self.db_name) as session:
            await session.execute_write(
                self.create_podcast, feed=rss_feed["feed"], url=url
            )

            try:
                existing_titles = await session.execute_read(
                    self._find_number_of_episodes, rss_feed=rss_feed, return_titles=True
                )
                n_new = len(rss_feed["entries"]) - len(existing_titles)

                if n_new > 0:
                    for i, entry in enumerate(rss_feed["entries"]):
                        if entry.get("title", "?") not in existing_titles:
                            await session.execute_write(
                                self.populate_episode, rss_feed=rss_feed, idx=i
                            )

                    logging.info(
                        f'🎙️ Added {n_new} episodes to {rss_feed["feed"]["title"]}'
                    )
            except KeyError as e:
                logging.error(f"❗️{e} in {url}")

Where create_podcast, _find_number_of_episodes & populate_episode are async and awaited in the same way described here

My code is launched like this:

async def main2(urls):
    tasks = []
    for url in tqdm_original(urls):
        try:
            rss_feed = feedparser.parse(url)
            task = asyncio.create_task(db_writer.process_rss_feed(rss_feed, url))
            tasks.append(task)
        except TimeoutError:
            logging.error(f"❗️Timeout {url}")

    for f in tqdm.asyncio.tqdm.as_completed(tasks):
        await f

asyncio.run(main2(urls))

Apologies for the lack of a reproducible example, but it seems to be a difficult issue to pin down due to its randomness. If you can steer me in the right direction maybe I can help try and isolate it

My Environment

Python Version: 3.10.8 Driver Version: 5.2.0 Server Version and Edition: Neo4j 4.3.6 enterprise Operating System: Ubuntu 22.04

Anjum48 commented 1 year ago

I think there is some sort of internal transaction limit that is being reached causing the above to silently fail. I found using a Semaphore fixed this:

async def main2(urls):
    sem = asyncio.Semaphore(100)

    async def safe_process(rss_feed, url):
        async with sem:
            return await db_writer.process_rss_feed(rss_feed, url)

    start = time()

    tasks = []
    for url in tqdm_original(urls):
        try:
            rss_feed = feedparser.parse(url)
            task = asyncio.create_task(safe_process(rss_feed, url))
            tasks.append(task)
        except TimeoutError:
            logging.error(f"❗️Timeout {url}")

    await tqdm.asyncio.tqdm.gather(*tasks)
    print(f"Finished in {(time() - start) / 60:.1f} minutes")

It might be useful to see what is failing in neo4j so that a more useful warning can be shown

robsdedude commented 1 year ago

I have been looking into this for quite a while now and am sadly not able to reproduce the error. I'm also very surprised by it because the driver is certainly not starting or stopping any event loops. And if you only call asyncio.run once and don't fiddle with event loops otherwise, there should only ever be one loop running. I'll keep playing around and see if I can find the root cause of the error.

Until then, here is some general advice on your code.

  1. As you noticed, too much concurrency is actually destructive and not helping performance at all. The reason this boils down to is that the driver has a connection pool and each session borrows 0-1 connections at any given time. The pool is concurrency-safe (using locks and signals and things - i.e., the more concurrency, the more interactions, the slower) + it's limited to 100 connections by default. You don't want to open 1.5k+ sockets to your DB. So when 1.5k+ Tasks come running to the pool "gimme connection plz" they'll start blocking each other potentially causing timeouts (wich you should see though as errors bubbling up).
    So maybe something like https://pypi.org/project/asyncio-pool/ might be beneficial for your code.
  2. Per feed, you manipulate your database in three separate transactions (create_podcast, _find_number_of_episodes, and populate_episode). To me it feels like these should maybe be one transaction to give you atomicity, but I can't know for sure without knowing what the queries actually do.
  3. Often, when bulk inserting data, the application gets bottlenecked by network roundrtip times which async will mitigate but also other overheads per query. So by sending 1 query per datapoint (or actually 3 in your case) you loose quite some time. Try re-writing the importer in a way that batches the feeds. I.e., send an object containing multiple records at once and user Cypher UNWIND to create multiple nodes/relationships in one query. Batch size tweaking is also highly use-case dependent, but 100 or 1000 might be a good starting point.
robsdedude commented 1 year ago

I think I have found a way to reproduce the error just now. I created the async driver outside of main2. On driver creation, the pool is created, which in turn creates the synchronization primitives like locks and such. These will call asyncio.get_event_loop which might be a different loop then the one running main2. Try to create the driver after creating the loop (e.g., inside main2) and let me know if that changes anything.

Anjum48 commented 1 year ago

Thanks for looking into this and I'm glad you were able to reproduce it. The scenario in your last message is exactly how I have my code set up, so your diagnosis is spot on. At the moment my code is structured using a class (essentially using the same style as the HelloWorldExample here https://neo4j.com/developer/python/#python-driver), so I think creating the task list before creating the driver would require a bit of refactoring at this moment.

Limiting concurrent tasks to 100 with a Semaphore seems to work great for me at the moment though as I'm now seeing full CPU utilization. This might still be an issue for users with a large number of CPU cores though?

robsdedude commented 1 year ago

Limiting concurrent tasks to 100 with a Semaphore seems to work great for me at the moment though as I'm now seeing full CPU utilization.

This is a fragile solution and we might decide to make this usage of the driver fail in the future. The reason why this works is that the driver internally uses async synchronization constructs that are based on how Python 3.7 where they call asyncio.get_event_loop which is a weird design and was changed with Python 3.10 to start using get_running_loop and fails if there is no loop running. This design makes much more sense and the driver might change to that. While this could be argued to be a breaking change, I think it's actually a fix and avoids errors like yours that only happen under certain circumstances at runtime and are hard to catch and debug potentially becoming an evil surprise that makes it into production environments.

So I highly recommend you to refactor your code either way.

Anjum48 commented 1 year ago

Ok I see, this makes a lot of sense. I'll look into refactoring my code as soon as I can. Thanks!

robsdedude commented 1 year ago

For documentation purposes, should someone stumble across this later:

In fact, I misread the new Condition code in Python 3.10+. It explicitly fixes the issue that used to arises from creating async synchronization primitives when no event loop was running. The condition now binds to a loop the first time it's actually used in an async context. This is even better as it will just allow users to create an async driver in a sync function and use it asynchronously later.

Important note: this will likely only work when the user is on Python 3.10+ because the driver also relies on synchronization primitives that come with asyncio. So their behavior depends on the used Python version.