neo4j / neo4j-spark-connector

Neo4j Connector for Apache Spark, which provides bi-directional read/write access to Neo4j from Spark, using the Spark DataSource APIs
https://neo4j.com/developer/spark/
Apache License 2.0
313 stars 112 forks source link

Neo4jGraph.saveGraph extension for CREATE/MERGE #15

Closed sjishan closed 7 years ago

sjishan commented 8 years ago

Hi, the saveGraph function supports the update operation. I was wondering if you have taken any attempt to create a new Neo4j graph from Apache Spark dataframe.

jexp commented 8 years ago

Great idea, any suggestion on what that could look like?

Would you do only and edgelist?

Or one df per label and one per rel-type?

sjishan commented 8 years ago

Yes, like an edge list. It would it be like the LOAD CSV command of Neo4j where we can create nodes from the rows of the CSV file and create edge between them.

I tried to do something like that using your code. But had an unsuccessful attempt. In this case I had a dataframe with two columns, X and Y. capture

jexp commented 8 years ago

I wrote this which works so far is the api sensible?

  def mergeEdgeList(sc: SparkContext, dataFrame: DataFrame, source: (String,Seq[String]), relationship: (String,Seq[String]), target: (String,Seq[String])): Unit = {
    val mergeStatement = s"""
      UNWIND {rows} as row
      MERGE (source:`${source._1}` {`${source._2.head}` : row.source.`${source._2.head}`}) ON CREATE SET source += row.source
      MERGE (target:`${target._1}` {`${target._2.head}` : row.target.`${target._2.head}`}) ON CREATE SET target += row.target
      MERGE (source)-[rel:`${relationship._1}`]->(target) ON CREATE SET rel += row.relationship
      """
    val partitions = Math.max(1,(dataFrame.count() / 10000).asInstanceOf[Int])
    val config = Neo4jConfig(sc.getConf)
    dataFrame.repartition(partitions).foreachPartition( rows => {
      val params: AnyRef = rows.map(r =>
        Map(
          "source" -> source._2.map( c => (c, r.getAs[AnyRef](c))).toMap.asJava,
          "target" -> target._2.map( c => (c, r.getAs[AnyRef](c))).toMap.asJava,
          "relationship" -> relationship._2.map( c => (c, r.getAs[AnyRef](c))).toMap.asJava)
          .asJava).asJava
          execute(config, mergeStatement, Map("rows" -> params).asJava)
    })
  }

    val rows = sc.makeRDD(Seq(Row("Keanu", "Matrix")))
    val schema = StructType(Seq(StructField("name", DataTypes.StringType), StructField("title", DataTypes.StringType)))
    val df = new SQLContext(sc).createDataFrame(rows, schema)
    Neo4jDataFrame.mergeEdgeList(sc, df, ("Person",Seq("name")),("ACTED_IN",Seq.empty),("Movie",Seq("title")))
    val edges : RDD[Edge[Long]] = sc.makeRDD(Seq(Edge(0,1,42L)))
    val graph = Graph.fromEdges(edges,-1)
    assertEquals(2, graph.vertices.count)
    assertEquals(1, graph.edges.count)
    Neo4jGraph.saveGraph(sc,graph,null,"test")
sjishan commented 8 years ago

Thank you for the prompt response. The code is clearly understandable. However, I got an error in the execute function again. Is the execute function you are using same as the one provided in the main branch here? capture

jexp commented 8 years ago

I had a local variant.

I think I have to sit down for a day or two to redesign the APIs

Michael

Von meinem iPhone gesendet

Am 13.08.2016 um 04:09 schrieb Syed Tanveer Jishan notifications@github.com:

Thank you for the prompt response. The code is clearly understandable. However, I got an error in the execute function again. Is the execute function you are using same as the one provided in the main branch here?

— You are receiving this because you commented. Reply to this email directly, view it on GitHub, or mute the thread.

sjishan commented 8 years ago

No problem. Looking forward to the new version. Meanwhile, will it be possible for you to provide your local copy of execute function here? Thanks.

jexp commented 8 years ago

I pushed the new versions, also for Spark 2.0 which contains these changes.

I have to go over the APIs again to make them friendlier. Please check and report back.

sjishan commented 8 years ago

Thank you it is working fine. However, I came across with the issue of scalability. I've repartitioned a huge dataframe which will build around 4 million nodes and 3 million edges. I was running the task on 8 cluster nodes. It seems like this was creating a bottleneck situation because when I stopped the execution on the Spark side then the graph kept on populating for the next 10-15 minutes (increase in node counts).

I stopped the execution because it was quite slow.

I was wondering if there is any major difference in write scalability in Community Edition vs. Enterprise Edition.

For instance in this case, I had 340 partition. First 32 was fast but then it got super slow. image

Here the first count was taken just before I cancelled the execution and the second count was taken 10 minutes later. image

jexp commented 8 years ago

What exactly did you call?

It parallelizes across the 8 workers in spark and the threads in Neo4j.

I think it might be even faster to run within a single thread, as the concurrent MERGE operations create locks that have to wait on each other.

Can you try to run it with a single partition?

sjishan commented 8 years ago

Yes I have 8 worker nodes on the Spark. I tried with different partition strategies starting with the one you have set val partitions = Math.max(1,(df.count() / 1000).asInstanceOf[Int]). I felt this is the best way to partition df.select("actor", "movie").distinct().repartition($"actor", $"movie").

I have tried with a single partition it was not very effective (and is also not feasible for what I am going to do). One interesting thing I noticed today is that the first 100k nodes took about 2 minutes and the next 100k took like about 1 hour.

I am planning to have around 2 billion nodes in the database. Also, around 40 million nodes and 30 million edges will be deleted and added per day. Therefore, being able to process 40 million nodes and 30 million edges in a decent amount of time (like few hours) is important for me.

sjishan commented 8 years ago

Is it possible to have an option of getting rid of write lock during concurrent MERGE operation ? Also, if I populate the database with the distinct nodes (asserting constraint) first and then the edges. Will that improve the situation?

jexp commented 8 years ago

do you need to guarantee uniqueness? if you only want to create data you can switch from MERGE to CREATE.

In general I think we might switch it to generating the nodes first in parallel. And then doing batches (perhaps 10k per batch) of relationships with retry if it fails.

Usually Neo4j can process quite a lot of data concurrently. The only problem that kicks in if concurrent processes write to the same data so that they have to wait for each other or even deadlock and have to retry.

jexp commented 8 years ago

So kinda

  1. select source node + properties from DF, partition in batches of 100k and write them to neo4j
  2. in parallel select target node + properties from DF, partition in batches of 100k and write them to neo4j
  3. after 1. and 2. are done

group relationships by source and do 1k groups per batch, retry transaction when it failed with deadlock

alternatively sort by min(source, target) and partition and batch that one

I try to find some time next week to implement these.

sjishan commented 8 years ago

Thanks. That will be great. Locking is a big issue for me now because the whole job gets aborted while creating the edges. This is happening even though I have using just one cluster node (one slave and one master). Attach herewith is the screenshot. It will be great if you can suggest me how to handle it.

image

jexp commented 8 years ago

@sjishan I finished the loading builder API over the weekend and would love your feedback. Writing would be next.

See: https://github.com/neo4j-contrib/neo4j-spark-connector#builder-api

sjishan commented 8 years ago

Great ! Thanks ! I will try it and get back to you.

For the write:

TransactionTemplate template = new TransactionTemplate(  ).retries( 5 ).backoff( 3, TimeUnit.SECONDS );

Object result = template.with(graphDatabaseService).execute( transaction -> {
    Object result1 = null;
    return result1;
} )`

http://neo4j.com/docs/java-reference/current/#transactions-deadlocks ` Can something similar to this be implemented?

jexp commented 8 years ago

Yep, good idea.

It should probably also split larger transactions into smaller ones then which are less likely to deadlock.

So perhaps add also a splitTo() / splitBy() option that determines into which smaller segments / partitions the data is split.

In general we should make sure that we write nodes first at rels second. Not sure how to make those dependencies clear in spark though.

On Tue, Sep 13, 2016 at 7:50 PM, Syed Tanveer Jishan < notifications@github.com> wrote:

Great ! Thanks ! I will try it and get back to you.

For the write: http://neo4j.com/docs/java-reference/current/#transactions-deadlocks

TransactionTemplate template = new TransactionTemplate( ).retries( 5 ).backoff( 3, TimeUnit.SECONDS );

Can something similar to this be implemented?

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/neo4j-contrib/neo4j-spark-connector/issues/15#issuecomment-246765242, or mute the thread https://github.com/notifications/unsubscribe-auth/AAEHY02s7v5rrnQKWB1RVKmCpNyWRUmHks5qpuJsgaJpZM4Jhg0- .

sjishan commented 8 years ago

I would like to bring one more thing to your attention.

Whenever the task fails due to deadlock, the database freezes. If I restart the neo4j and try a different dataframe to write then the task fails instantly. This issue only gets resolved when I delete the data folder.

sjishan commented 8 years ago

Hi Michael,

Is there any update with the lock issue?

kalyanjanaki commented 7 years ago

Do we have any update on this ? Is the final Code ready to be uploaded

sjishan commented 7 years ago

Hi Michael, is there any update regarding the lock problem?

jexp commented 7 years ago

Hi @sjishan @kalyanjanaki

I did some work on it this weekend, adding a saveGraph method to Neo4j and extending the Neo4jGraph.saveGraph() with options to match/merge(boolean flag) on label+property combinations, and MATCH/MERGE relationships.

This is a two part answer:

Part 1

as I said before it would make most sense to

Part 2

But I feel that it's getting more complicated to specify more and more options as parameters to the save method than perhaps working out concrete use-cases and creating dedicated methods for each of them, which could then also handle the distinctness and retry-concerns (e.g. with smaller batches) in case of deadlock.

Should the method do the distinct operation or leave it to the user?

Can you help me figuring them out?

// will do merge or match

// match on nodes (by node-id or label+prop) // match, create, merge, delete of relationship

jexp commented 7 years ago

@sjishan if the database freezes, could you please check the neo4j.log / debug.log and see if there is any output at the end (blocking operations, errors, etc) and share them here?

sjishan commented 7 years ago

Well.

  1. Usually there is no failure while inserting the nodes because I take the distinct values from the dataframe. The problem only happens when I am creating the relationship. I handle node creation and edge creation separately.

  2. While creating edges I try to repartition by the source and target nodes in order to avoid deadlock as much as possible. However, no matter how you reparitition it is not always possible to avoid deadlock. I think this is the problem and can only be resolved if the deadlock can be handled without timeout.

Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 223.0 failed 4 times, most recent failure: Lost task 7.3 in stage 223.0 (TID 3828, 10.233.229.240, executor 0): org.neo4j.driver.v1.exceptions.TransientException: LockClient[144445] can't wait on resource RWLock[NODE(11734985), hash=680169680] since => LockClient[144445] <-[:HELD_BY]- RWLock[NODE(17975565), hash=396977757] <-[:WAITING_FOR]- LockClient[144440] <-[:HELD_BY]- RWLock[NODE(11734985), hash=680169680]

sjishan commented 7 years ago

Here is the portion of the debug.log when the failure happened.

debuglog.txt

sjishan commented 7 years ago

Hi @jexp Thank you for suggesting to use connected component. It seems partitioning by that solves the problem of deadlock.

GreGGus commented 6 years ago

@sjishan I have exact same issue.

What do you mean by "connected component"?

ksceriath commented 4 years ago

Hi @jexp ,

I see a mention of operation=CREATE/MATCH/MERGE/DELETE in the parameters of the updateRelationships and updateNodes methods, in your comment above.

Was this implemented/ released? I could not find any references to it in the current code.

edit: I am specifically looking for a way to do a DELETE operation. I have a similar use case as @sjishan mentioned above : recurring (e.g. daily) create and delete operations to an existing graph.