neo4j / neo4j-spark-connector

Neo4j Connector for Apache Spark, which provides bi-directional read/write access to Neo4j from Spark, using the Spark DataSource APIs
https://neo4j.com/developer/spark/
Apache License 2.0
313 stars 112 forks source link

partitions: 8 leads to inefficient COUNT wrapper #641

Open pascalwhoop opened 4 months ago

pascalwhoop commented 4 months ago

Expected Behavior (Mandatory)

partitions: 8 should not lead to performance degradation upon read https://neo4j.com/docs/spark/current/read/options/

Actual Behavior (Mandatory)

The query

      MATCH (n: Entity) RETURN n.id as id, n.embedding as embedding

is converted into

CALL { MATCH (n: Entity) RETURN n.id as id, n.embedding as embedding }
RETURN count(*) AS count

Which means it retrieves all data from DB, then counts them, then calculates partitions size and reads again rather than directly doing a count(*) (which is much faster).

is this because we're using the spark 'query' based reading?

ahxxm commented 4 months ago

a workaround I'm using is pass the count query by read_options["query.count"] = "MATCH (n: Entity) RETURN COUNT(*) as count", it then uses the whatever meta store to get node count in O(1). This is especially needed when scanning edges, because that store only supports specifying edge type + 1 node type for O1 access

Additionally, the connector will convert partition into SKIP+LIMIT(where each executor still need to SKIP then read LIMIT.........), and the dataframe will contain duplicates when your db is also under writes

pascalwhoop commented 4 months ago

Interessant thanks for Sharing that. We ended up filtering using the Labels parameter instead and got better results that way. I guess at the moment the queries are treated as a string not decomposed into a query that can be optimized by a query planner as eg spark does it

On Sat, 27 Jul 2024 at 06:52, ahxxm @.***> wrote:

a workaround I'm using is pass the count query by read_options["query.count"] = "MATCH (n: Entity) RETURN COUNT(*) as count", it then uses the whatever meta store to get node count in O(1). This is especially needed when scanning edges, because that store only supports specifying edge type + 1 node type for O1 access

Additionally, the connector will convert partition into SKIP+LIMIT, and the dataframe will contain duplicates when your db is also under writes

— Reply to this email directly, view it on GitHub https://github.com/neo4j-contrib/neo4j-spark-connector/issues/641#issuecomment-2253763904, or unsubscribe https://github.com/notifications/unsubscribe-auth/AARN3F3S23OKOASEGXQ3ZODZOMRQNAVCNFSM6AAAAABLQIKK3GVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDENJTG43DGOJQGQ . You are receiving this because you authored the thread.Message ID: @.***>

--

The contents of this email are confidential and intended only for the intended recipient. If you are not the intended recipient or responsible for delivery of the message to such person, you may not use, copy, distribute or deliver this message or any part of its contents to anyone, or take any action in reliance on it. If you have received this email in error, please immediately notify the sender and then permanently delete this message. The views, opinions, conclusions and other information provided herein are not intended to be and should not be interpreted as medical advice. Please consult your physician for all medical advice.

ahxxm commented 4 months ago

yeah the connector just wraps simple operations without heuristics.. by "labels parameter" do you mean some indexed fields that enable a cursur-based query?

fbiville commented 4 months ago

I guess at the moment the queries are treated as a string not decomposed into a query that can be optimized by a query planner as eg spark does it

That's correct, the Spark connector currently treats Cypher queries as black boxes.