neo4j / neo4j-spark-connector

Neo4j Connector for Apache Spark, which provides bi-directional read/write access to Neo4j from Spark, using the Spark DataSource APIs
https://neo4j.com/developer/spark/
Apache License 2.0
312 stars 112 forks source link

Spark Connector sends FloatArray as String #666

Open piotrkan opened 4 weeks ago

piotrkan commented 4 weeks ago

Describe the bug I am using node2vec to generate graph embeddings for my graph. I use the following code and it works fine - it successfully saves the embeddings in the graph stored in neo4j under 'topological embeddings' property name

gds.node2vec.write(graph, writeProperty='topological_embeddings)

But when I am reading the graph in the next step when conducting dimensionality reduction (df corresponds to a spark dataframe with nodes and topological embeddings as cols):

df = df.withColumn("features", array_to_vector('topological_embeddings))

I get the following error:

pyspark.errors.exceptions.captured.AnalysisException: [UNSUPPORTED_DESERIALIZER.DATA_TYPE_MISMATCH] The deserializer is not supported: need a(n) "ARRAY" field but got "STRING".  

This is not happening when I use GraphSage and write the embeddings in the following format

model, attr =  gds.beta.graphsage.train(graph) 
model = gds.model.get('topological_embeddings') 
model.predict_write(graph, writeProperty='topological_embeddings')

Have you experienced anything like this? seems like the .write() function in the python client saves the embeddings as strings?

graphdatascience library version: GDS plugin version: 2.7.0 Python version: 3.11 Neo4j version: 5.21.0 Operating system: macOS

jjaderberg commented 3 weeks ago

Hi @piotrkan :wave:

Can you share a fuller example? What happens between your gds.node2vec.write and df.withColumn calls? How do you get the dataframe from Neo4j?

Node2Vec in write mode writes arrays of 32-bit floating point values.

piotrkan commented 3 weeks ago

Hi @jjaderberg sorry for delayed reply!

I reproduced the error in the following code - I now realize that the problem is not in writing the embeddings but reading them using pyspark and neo4j - do you have any idea what's causing the issue?

from graphdatascience import GraphDataScience as gds
from neo4j import GraphDatabase
from graphdatascience import GraphDataScience
import numpy as np
from pyspark.sql import SparkSession

uri = "bolt://127.0.0.1:7687"
driver = GraphDatabase.driver(uri, auth=("user", "password"))

with driver.session() as session:
    result = session.run("""
    CREATE (alice:Person {name: 'Alice'})
    CREATE (bob:Person {name: 'Bob'})
    CREATE (carol:Person {name: 'Carol'})
    CREATE (dave:Person {name: 'Dave'})
    CREATE (eve:Person {name: 'Eve'})
    CREATE (guitar:Instrument {name: 'Guitar'})
    CREATE (synth:Instrument {name: 'Synthesizer'})
    CREATE (bongos:Instrument {name: 'Bongos'})
    CREATE (trumpet:Instrument {name: 'Trumpet'})

    CREATE (alice)-[:LIKES]->(guitar)
    CREATE (alice)-[:LIKES]->(synth)
    CREATE (alice)-[:LIKES]->(bongos)
    CREATE (bob)-[:LIKES]->(guitar)
    CREATE (bob)-[:LIKES]->(synth)
    CREATE (carol)-[:LIKES]->(bongos)
    CREATE (dave)-[:LIKES]->(guitar)
    CREATE (dave)-[:LIKES]->(synth)
    CREATE (dave)-[:LIKES]->(bongos);

    """)
with driver.session() as session:
    result = session.run("""
        CALL gds.graph.project(
            'myGraph',
            ['Person', 'Instrument'], // Node labels
            {
                LIKES: { orientation: 'UNDIRECTED' }
            }
        )
        YIELD graphName, nodeCount, relationshipCount
        RETURN graphName, nodeCount, relationshipCount
        """)

gds = GraphDataScience("bolt://127.0.0.1:7687", auth=('user', 'password'))
G = gds.graph.get('myGraph')

attr = gds.node2vec.write(G=G, writeProperty='topological_embedding')

topo_list = []
with driver.session() as session:
    result = session.run("""MATCH (n) RETURN n.topological_embedding as topological_embedding
        """)
    for record in result:
        topo_list.append(record['topological_embedding'])

spark = SparkSession.builder \
    .appName("Neo4j-Spark-Connector") \
    .config("spark.neo4j.url", 'bolt://127.0.0.1:7687') \
    .config("spark.jars", "gcs-connector-hadoop3-2.2.2-shaded.jar") \
    .config('spark.jars.packages', 'com.google.cloud.spark:spark-3.5-bigquery:0.39.0,org.neo4j:neo4j-connector-apache-spark_2.12:5.3.0_for_spark_3')\
    .config('spark.hadoop.fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')\
    .getOrCreate()

load_args = {'partitions':16,'labels':'Person'}

# Load data from Neo4j using the `labels` option
df = spark.read.format("org.neo4j.spark.DataSource") \
    .option("labels", "Person") \
    .option("partitions", 16) \
    .option("url", 'bolt://127.0.0.1:7687')\
    .options(**neo4j_credentials)\
    .options(**load_args).load()
print(df)

What this returns is the following DataFrame, where topological_embedding is a string not an array

DataFrame[<id>: bigint, <labels>: array<string>, topological_embedding: string, name: string]
jjaderberg commented 2 weeks ago

@piotrkan I agree that your reproducer suggests something goes wrong reading the float array node properties via pyspark. I have raised it with the maintainers of the Neo4j Spark Connector.

If you want to work around the problem you should be able to convert the float array property values to double array values with the toFloatList Cypher function.

result = session.run("MATCH (n) RETURN toFloatList(n.topological_embedding) as topological_embedding")

and for the label-based loading with pyspark and the Spark Connector you can overwrite the property with the type converted double array value

MATCH (n:Person) SET n.emb = toFloatList(n.emb)

If you do this before reading the Person label with pyspark, it should succeed (assuming our hypothesis is correct).

N.B. "float" in the function name "toFloatList" refers to Cypher floating point type, which is 64 bits. Node2Vec writes 32 bit floating point values for smaller memory footprint. Some languages use "float" for the 32-bit type and "double" for the 64-bit type. Neo4j supports storing both of these types and the Cypher runtime can also handle both. But in the Cypher language there is only a single floating point type and it is 64 bits.

It's unfortunate the Node2Vec result didn't work to consume via pyspark, hopefully the Connector maintainers can confirm or debunk our hypothesis and address the problem if it is indeed missing support in the Connector.

ali-ince commented 4 days ago

Hello @piotrkan,

Thanks for reporting this. I've tried to reproduce this with your reproducer, but what I get back is always an array<double> with pyspark;

DataFrame[<id>: bigint, <labels>: array<string>, topological_embedding: array<double>, name: string]

with similar results with spark through JVM;

root
 |-- <id>: long (nullable = false)
 |-- <labels>: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- topological_embedding: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- name: string (nullable = true)

I have one theory though. Is it possible that you have a property named topological_embedding in your graph which is typed as a string? Spark connector derives schema information by performing a sampling, so that's a possible option in my opinion. If that's the case, I suggest you to define your schema beforehand as shown in here.

I'll keep the issue open for a few days until I hear back from you.