neo4j / neo4j-python-driver

Neo4j Bolt driver for Python
https://neo4j.com/docs/api/python-driver/current/
Other
900 stars 186 forks source link

Neo4j AuraDS on GCP: Cannot run long queries #788

Closed Ark-kun closed 2 years ago

Ark-kun commented 2 years ago

We use Neo4j AuraDS on GCP: https://pantheon.corp.google.com/marketplace/product/endpoints/prod.n4gcp.neo4j.io

We have a script to regularly update data in Neo4j DB. Previously this script worked (on Neo4j AuraSB), but does not seem to work anymore (our data has grown slightly). The script now throws Failed to read from defunct connection error on queries that take more time (it still works on smaller queries).

Can you please help us run queries on managed Neo4j again?

Neo4j Version: 4.4.0 Enterprise Neo4j Mode: Neo4j AuraDS on GCP Driver version: Python driver 4.4.5

Steps to reproduce

Code:

import neo4j
neo4j_driver = neo4j.GraphDatabase.driver(
    _NEO4J_URI, auth=(_NEO4J_USER, _NEO4J_PASSWORD))
with neo4j_driver.session() as session:
    session.run(query='''
//:auto
MATCH (r:Request)
WHERE r.obscured_consumer_project_number IS NOT NULL
// Use hash to stabilize and randomize the sequence ordering for same request_time and user_id
// It's also desirable to make the ordering the same for different relationships (e.g. NEXT_PROJECT_REQUEST and NEXT_USER_REQUEST)
// Use the row id for hashing or an explicit object like `apoc.hashing.fingerprint({method: r.method, request_time: r.request_time, ...})`
WITH *, apoc.hashing.fingerprint(r.id) AS hash
WITH * ORDER BY r.request_time, hash
WITH r.obscured_consumer_project_number AS obscured_consumer_project_number, collect(r) AS requests
UNWIND apoc.coll.pairs(requests) AS request_pair
WITH request_pair[0] AS r1, request_pair[1] AS r2
WHERE r2 IS NOT NULL
CALL {
  WITH r1, r2
  MERGE (r1)-[:NEXT_PROJECT_REQUEST]->(r2)
} IN TRANSACTIONS OF 1000 ROWS
;
'''
)

Actual behavior

Applying query wipeout.cypher
Applying query NEXT_PROJECT_REQUEST_relationships.cypher
Failed to read from defunct connection IPv4Address(('7XXX.databases.neo4j.io', 7687)) (IPv4Address(('YYY', 7687)))
---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
/opt/conda/lib/python3.7/site-packages/neo4j/io/_common.py in _yield_messages(self, sock)
     56                     # Determine the chunk size and skip noop
---> 57                     buffer.receive(sock, 2)
     58                     chunk_size = buffer.pop_u16()

/opt/conda/lib/python3.7/site-packages/neo4j/packstream.py in receive(self, sock, n_bytes)
    483             if n == 0:
--> 484                 raise OSError("No data")
    485             self.used += n

OSError: No data

The above exception was the direct cause of the following exception:

SessionExpired                            Traceback (most recent call last)
/tmp/ipykernel_2514/2341552839.py in <module>
----> 1 main([])

/tmp/ipykernel_2514/2512792290.py in main(***failed resolving arguments***)
     17       bq_client=bq_client,
     18       neo4j_driver=neo4j_driver,
---> 19       batch_size=1000,
     20   )

/tmp/ipykernel_2514/2580852736.py in ingest_all_data(bq_client, neo4j_driver, batch_size)
    113     print(f"Applying query {query_file}")
    114     with neo4j_driver.session() as session:
--> 115       session.run(query=pathlib.Path(query_file).read_text())

/opt/conda/lib/python3.7/site-packages/neo4j/work/simple.py in run(self, query, parameters, **kwparameters)
    216             query, parameters, self._config.database,
    217             self._config.impersonated_user, self._config.default_access_mode,
--> 218             self._bookmarks, **kwparameters
    219         )
    220 

/opt/conda/lib/python3.7/site-packages/neo4j/work/result.py in _run(self, query, parameters, db, imp_user, access_mode, bookmarks, **kwparameters)
    115         self._pull()
    116         self._connection.send_all()
--> 117         self._attach()
    118 
    119     def _pull(self):

/opt/conda/lib/python3.7/site-packages/neo4j/work/result.py in _attach(self)
    204         if self._closed is False:
    205             while self._attached is False:
--> 206                 self._connection.fetch_message()
    207 
    208     def _buffer(self, n=None):

/opt/conda/lib/python3.7/site-packages/neo4j/io/_common.py in inner(*args, **kwargs)
    186             def inner(*args, **kwargs):
    187                 try:
--> 188                     func(*args, **kwargs)
    189                 except (Neo4jError, ServiceUnavailable, SessionExpired) as exc:
    190                     self.__on_error(exc)

/opt/conda/lib/python3.7/site-packages/neo4j/io/_bolt4.py in fetch_message(self)
    281 
    282         # Receive exactly one message
--> 283         details, summary_signature, summary_metadata = next(self.inbox)
    284 
    285         if details:

/opt/conda/lib/python3.7/site-packages/neo4j/io/_common.py in __next__(self)
     81 
     82     def __next__(self):
---> 83         tag, fields = self.pop()
     84         if tag == b"\x71":
     85             return fields, None, None

/opt/conda/lib/python3.7/site-packages/neo4j/io/_common.py in pop(self)
     75 
     76     def pop(self):
---> 77         return next(self._messages)
     78 
     79 

/opt/conda/lib/python3.7/site-packages/neo4j/io/_common.py in _yield_messages(self, sock)
     72 
     73         except (OSError, socket.timeout, SocketDeadlineExceeded) as error:
---> 74             self.on_error(error)
     75 
     76     def pop(self):

/opt/conda/lib/python3.7/site-packages/neo4j/io/__init__.py in _set_defunct_read(self, error, silent)
    556             self.unresolved_address, self.server_info.address
    557         )
--> 558         self._set_defunct(message, error=error, silent=silent)
    559 
    560     def _set_defunct_write(self, error=None, silent=False):

/opt/conda/lib/python3.7/site-packages/neo4j/io/__init__.py in _set_defunct(self, message, error, silent)
    601         else:
    602             if error:
--> 603                 raise SessionExpired(message) from error
    604             else:
    605                 raise SessionExpired(message)

SessionExpired: Failed to read from defunct connection IPv4Address(('XXX.databases.neo4j.io', 7687)) (IPv4Address(('YYY', 7687)))

P.S. The DB was taking less than 30% of instance space and there were no OOM errors. Using Ne04j browser I got "SessionExpired" errors when trying to run the query. After resizing the DB from {8, 16, 2} to {24, 48, 5}, the query finishes in the browser: Created 8655786 relationships, completed after 430174 ms The DB now takes <14% of storage space.

robsdedude commented 2 years ago

Hi and thanks for reaching out.

First, it'd be helpful to get some driver debug logs. Please run see the API docs on how to enable it. Feel free to replace the IP addresses, but I'd like to see the full log of that one query including the driver start-up.

laeg commented 2 years ago

@Ark-kun cheers for reaching out and sorry to hear you're having trouble.

Just because I'm going to need to ask for some personal details, would you mind filing a support ticket so we can have a private conversation about the instance in question?

The easiest way is in the console navbar -> get help -> support or directly here: https://aura.support.neo4j.com/hc/en-us

Once that's done, we can start exploring :) At this point, it's really hard to know if it's the instance, our infra, the network or the driver - but we'll figure it out!

Thank you! LG

robsdedude commented 2 years ago

I'll close this issue here then. Should it boil down to a driver issue, please feel free to re-open this a open a new issue.