neo4j / neo4j-python-driver

Neo4j Bolt driver for Python
https://neo4j.com/docs/api/python-driver/current/
Other
898 stars 186 forks source link

Async error #950

Closed FrancescoSaverioZuppichini closed 1 year ago

FrancescoSaverioZuppichini commented 1 year ago

Hi there 👋

So I am trying to read files in a batch way and creating records in neo4j

async def add_one(file_path: str, db_handler):
    email = await aparse_eml(file_path)
    await db_handler.add_email(email)
    print(f"{email.message_id}: Done!")

async def process_batch(file_paths: List[str], db_handler):
    tasks = [add_one(fp, db_handler) for fp in file_paths]
    await asyncio.gather(*tasks)

async def main(db_handler):
    file_paths_str = list(map(lambda x: str(x), file_paths))

    batch_size = 100  # Adjust this value based on your system's capacity
    start = perf_counter()

    for i in range(0, len(file_paths_str), batch_size):
        batch = file_paths_str[i : i + batch_size]
        await process_batch(batch, db_handler)

    print(f"elapsed {(perf_counter() - start)*1000:.2f}ms")
    await db_handler.close()

if __name__ == "__main__":
    # main()
    db_handler = AsyncGraphDBHandler()
    asyncio.run(main(db_handler))

Where my aparse_eml

async def aparse_eml(file_path: str):
    async with aiofiles.open(file_path, "rb") as f:
        data = await f.read()
    return parse_bytes(data)

opens a file and does some processing to obtain a dataclass Email, my AsyncGraphDBHandler

class AsyncGraphDBHandler:
    def __init__(
        self,
        uri: str = "neo4j://localhost:7687",
        username: str = "neo4j",
        password: str = "password",
    ) -> None:
        self.driver = AsyncGraphDatabase.driver(uri, auth=(username, password))

    async def create_person(self, tx, person: Person):
        result = await tx.run(
            """
            MERGE (p:Person {name: $name, address: $address})
            RETURN p
            """,
            name=person.name,
            address=person.address,
        )

    async def create_email(self, tx, email: Email):
        # First create all the Person
        # sender: Person
        # recipient: Person
        # cc: Optional[List[Person]]
        await self.create_person(tx, email.sender)
        await self.create_person(tx, email.recipient)
        [
            await self.create_person(tx, person) for person in email.cc
        ] if email.cc else []

        # Then create the Email
        await tx.run(
            """
            MERGE (e:Email {message_id: $message_id, subject: $subject, date: $date, 
            date_unix: $date_unix, content: $content, reply_to: $reply_to, 
            previous_email_content: $previous_email_content})
            RETURN e
            """,
            message_id=email.message_id,
            subject=email.subject,
            date=email.date,
            date_unix=email.date_unix,
            content=email.content,
            reply_to=email.reply_to or "",
            previous_email_content=email.previous_email_content,
        )

        # Create relationships with sender, recipient, and cc
        await tx.run(
            """
            MATCH (e:Email), (p:Person)
            WHERE e.message_id = $message_id AND p.address = $sender_address
            MERGE (p)-[:SENT]->(e)
            """,
            message_id=email.message_id,
            sender_address=email.sender.address,
        )

        await tx.run(
            """
            MATCH (e:Email), (p:Person)
            WHERE e.message_id = $message_id AND p.address = $recipient_address
            MERGE (p)-[:RECEIVED]->(e)
            """,
            message_id=email.message_id,
            recipient_address=email.recipient.address,
        )

        for cc_person in email.cc:
            await tx.run(
                """
                MATCH (e:Email), (p:Person)
                WHERE e.message_id = $message_id AND p.address = $cc_address
                MERGE (p)-[:CC]->(e)
                """,
                message_id=email.message_id,
                cc_address=cc_person.address,
            )

        # Link to previous emails
        for ref in email.references:
            await tx.run(
                """
                MATCH (e1:Email {message_id: $message_id}), (e2:Email {message_id: $ref})
                MERGE (e1)-[:REFERENCES]->(e2)
                """,
                message_id=email.message_id,
                ref=ref,
            )

    async def add_email(self, email: Email):
        async with self.driver.session(database="neo4j") as session:
            await session.write_transaction(self.create_email, email)

I am getting this error

RuntimeError: Task <Task pending name='Task-7' coro=<add_one() running at /Users/francescozuppichini/Documents/hunter-biden-emails/main.py:19> cb=[gather.<locals>._done_callback() at /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:764]> got Future <Future pending> attached to a different loop

I think they are running in two different event loops

Since I am here, is there a way to create all the persons concurrently and efficiently, I've tried different things but none of them works.

Thanks a lot

Fra

robsdedude commented 1 year ago

Hello and thanks for reaching out. What version of the driver are you using?

Please try creating the driver wrapper (and with it the driver) inside the async main and let me know if this fixes it. In the meantime I'll try to reproduce the error.

...

async def main():
    db_handler = AsyncGraphDBHandler()
    ...

if __name__ == "__main__":
    asyncio.run(main())

This issue reminds me a lot of https://github.com/neo4j/neo4j-python-driver/issues/868 and I thought I fixed it :thinking:

robsdedude commented 1 year ago

Managed to reproduce it using Python 3.7.

It is indeed the same issue as the one I linked in the previous comment and it does not come from within the driver but how asynio implements synchronization primitives. Until Python 3.9 they use get_event_loop which creates a new event loop if there is no running loop. So there are a few options forward:

robsdedude commented 1 year ago

I will close the issue as it doesn't appear to be a bug in the driver. Please feel free to keep commenting if you have further questions.

FrancescoSaverioZuppichini commented 1 year ago

Managed to reproduce it using Python 3.7.

It is indeed the same issue as the one I linked in the previous comment and it does not come from within the driver but how asynio implements synchronization primitives. Until Python 3.9 they use get_event_loop which creates a new event loop if there is no running loop. So there are a few options forward:

Thank a lot. Need to check (not at my home pc) but I think I was using python 3.9. I will try with python 3.10 and let you know