neo4j / neo4j-python-driver

Neo4j Bolt driver for Python
https://neo4j.com/docs/api/python-driver/current/
Other
883 stars 187 forks source link

Asyncio errors when cleaning up async drivers #1044

Closed williamhakim10 closed 3 months ago

williamhakim10 commented 3 months ago

Bug Report

We've been experiencing issues with event loops and trying to close() on an AsyncNeo4jDriver.

This is the error we've been seeing:

Traceback (most recent call last):
  File "/layers/google.python.pip/pip/lib/python3.10/site-packages/flask/app.py", line 1463, in wsgi_app
    response = self.full_dispatch_request()
  File "/layers/google.python.pip/pip/lib/python3.10/site-packages/flask/app.py", line 872, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/layers/google.python.pip/pip/lib/python3.10/site-packages/flask/app.py", line 870, in full_dispatch_request
    rv = self.dispatch_request()
  File "/layers/google.python.pip/pip/lib/python3.10/site-packages/flask/app.py", line 855, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)  # type: ignore[no-any-return]
  File "/layers/google.python.pip/pip/lib/python3.10/site-packages/functions_framework/__init__.py", line 178, in view_func
    function(event)
  File "/layers/google.python.pip/pip/lib/python3.10/site-packages/functions_framework/__init__.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/workspace/main.py", line 66, in schedule_collection
    asyncio.run(graph_db_template.dispose())
  File "/layers/google.python.runtime/python/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/layers/google.python.runtime/python/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "/workspace/libs/database_support/graph_database_template.py", line 114, in dispose
    await self.__async_driver.close()
  File "/layers/google.python.pip/pip/lib/python3.10/site-packages/neo4j/_async/driver.py", line 602, in close
    await self._pool.close()
  File "/layers/google.python.pip/pip/lib/python3.10/site-packages/neo4j/_async/io/_pool.py", line 483, in close
    await self._close_connections(connections)
  File "/layers/google.python.pip/pip/lib/python3.10/site-packages/neo4j/_async/io/_pool.py", line 417, in _close_connections
    await connection.close()
  File "/layers/google.python.pip/pip/lib/python3.10/site-packages/neo4j/_async/io/_bolt.py", line 953, in close
    await self.socket.close()
  File "/layers/google.python.pip/pip/lib/python3.10/site-packages/neo4j/_async_compat/network/_bolt_socket.py", line 165, in close
    await self._writer.wait_closed()
  File "/layers/google.python.runtime/python/lib/python3.10/asyncio/streams.py", line 343, in wait_closed
    await self._protocol._get_close_waiter(self)
RuntimeError: Task <Task pending name='Task-364892' coro=<GraphDatabaseTemplate.dispose() running at /workspace/libs/database_support/graph_database_template.py:114> cb=[_run_until_complete_cb() at /layers/google.python.runtime/python/lib/python3.10/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop

Our code for handling graph db drivers looks like this. I am including the handlers for dead connections for completeness, but I've tried removing them and refactoring the code to use execute_query or something else instead, and it doesn't matter.

class GraphDatabaseTemplate:
    def __init__(
        self, *, db_url: str, db_user: str, db_password: str, database: str
    ) -> None:
        ...
        self.__driver = self.__create_driver()
        self.__async_driver = self.__create_async_driver()

    def __create_driver(self) -> Driver:
        return GraphDatabase.driver(
            self.__db_url,
            auth=self.__db_auth,
            database=self.__default_db,
            max_connection_lifetime=40,
            keep_alive=True,
        )

    def __create_async_driver(self) -> AsyncDriver:
        return AsyncGraphDatabase.driver(
            self.__db_url,
            auth=self.__db_auth,
            database=self.__default_db,
            max_connection_lifetime=40,
            keep_alive=True,
        )

    @asynccontextmanager
    async def begin_async(self) -> AsyncIterator[AsyncSession]:
        async with self.__async_driver.session() as session:
            yield session

    async def query_async(
        self,
        *,
        statement: LiteralString,
        session: Optional[AsyncSession] = None,
        parameters: Optional[dict[str, Any]] = None,
        retries_remaining: int = _MAX_RETRIES,
    ) -> list[Record]:
        # AuraDB has a connection timeout of 60 minutes; this is the recommended way of handling SessionExpired errors
        # https://support.neo4j.com/s/article/1500001173021-How-to-handle-Session-Expired-Errors-while-connecting-to-Neo4j-Aura
        try:
            if session is None:
                async with self.begin_async() as session:
                    return [r async for r in await session.run(statement, parameters)]
            else:
                return [r async for r in await session.run(statement, parameters)]
        except SessionExpired:
            if retries_remaining:
                self.__async_driver = self.__create_async_driver()
                # if the connection expired, the session will correspond to the dead driver instance; retry without one
                return await self.query_async(
                    statement=statement,
                    session=None,
                    parameters=parameters,
                    retries_remaining=retries_remaining - 1,
                )
            raise

    def dispose(self) -> None:
        self.__driver.close()
        asyncio.run(self.__async_driver.close())

This code gets called inside of a Cloud Function which looks like this:

@functions_framework.cloud_event
def schedule_collection(event: CloudEvent) -> None:
   ...
    graph_db_template = GraphDatabaseTemplate(
        db_url=env.neo4j_url,
        db_user="neo4j",
        db_password=env.neo4j_password,
        database=env.neo4j_database,
    )
    try:
        ... do some stuff here that involves `asyncio.run(...)` and uses the async driver ...
    except Exception:
        logger.exception("unable to handle event %s", event)
    finally:
        graph_db_template.dispose()

I have tried various permutations of keeping track of the event loop (i.e. by storing it in the template) and then using a lower-level asyncio primitive which allows passing in a loop, as well as just allowing the driver to clean itself up without doing anything at all - in both cases I get a RuntimeError('Event loop is closed').

Note that I am able to recreate this behavior using a simple unittest test case with only the following parameters as long as the test in question actually queries the graph.

class TestCase(IsolatedAsyncioTestCase):
    def setUp(self) -> None:
        super().setUp()
        self.graph = test_graph_template()

    def tearDown(self) -> None:
        super().tearDown()
        self.graph.dispose()

    async def test_fails(self) -> None:
        await self.graph_db.query_async(statement="MATCH (n) RETURN n")

    async def test_passes(self) -> None:
        pass

I see that this came up previously (https://github.com/neo4j/neo4j-python-driver/issues/950) but obviously we're on 3.10 so that particular issue hasn't gone away.

Thanks for taking a look and appreciate the support!

My Environment

Python Version: Python 3.10.13 (main, Apr 8 2024, 10:10:13) [Clang 15.0.0 (clang-1500.3.9.4)] Driver Version: 5.19.0 Server Version and Edition: Neo4j EE 5.18.0 (locally, in tests) but also AuraDB Operating System: MacOS Sonoma 14.3, but also ubuntu:latest in Github CI and whatever linux version Cloud Run uses.

robsdedude commented 3 months ago

Hi and thanks for reaching out.

If my analysis is correct, the problem stems from the line

asyncio.run(self.__async_driver.close())

Let me quote the docs of asyncio.run:

This function runs the passed coroutine, taking care of managing the asyncio event loop, finalizing asynchronous generators, and closing the executor.

This function cannot be called when another asyncio event loop is running in the same thread.

So a new event loop will be started to execute the passed coroutine (driver.close()). With that in mind the error you're seeing

[...] Task-364892 [...] got Future [...] attached to a different loop might make more sense. The moment you first use the driver to execute async work, it will be bound to the event loop executing the work. I/O streams, locks, etc., all assume they belong to a singe event loop.

I'll close the issue for now as it does not appear to be an issue with the driver. But please feel free to keep commenting if you have further questions.

williamhakim10 commented 3 months ago

Hi @robsdedude I think you're correct in your analysis, but why does it work that way? The various other async services which we run (sqlalchemy, redis, etc.) are totally fine with being torn down in a different event loop from the one in which they do the work.

I understand the ideal refactor here is to set up and tear down the inside a single block of async code? But that pattern basically assumes that your code is mostly or entirely async, rather than writing mostly sync code which occasionally does async stuff.

robsdedude commented 3 months ago

The various other async services which we run (sqlalchemy, redis, etc.) are totally fine with being torn down in a different event loop from the one in which they do the work.

Here, I can only speculate, as I don't know the internals of the libraries. Maybe they are doing less/no async work at teardown. They could, for example just close the sockets without waiting. Whereas this driver tries to be a good citizen by sending a GOODBYE message to the server before shutting down the socket.

I understand the ideal refactor here is to set up and tear down the inside a single block of async code?

Yes, unless you manage the eventloop manually and pass it around. However, I see your code uses both, an async and a sync driver. But while you're running inside a coroutine (or async Task, etc.) you shouldn't be using any blocking operations like those the sync driver performs as that would block the event loop preventing other async tasks from progressing. As you wrote: mixing async and sync code is an error prone approach in general.

williamhakim10 commented 3 months ago

Yeah, definitely not doing any blocking code inside a coroutine, so that's not the concern.

The reason for declaring both a synchronous and asynchronous driver is so that configuration can happen at the app's entrypoint, rather than pushing configuration/environment variables/etc. down into random blocks of async code which may live in shared libraries or other places that don't know anything about their environment. It sounds like the only way to get around this is to manually pass around the event loop somehow?