G-Research / spark-dgraph-connector

A connector for Apache Spark and PySpark to Dgraph databases.
Apache License 2.0
43 stars 11 forks source link

Question about subgraphs/filtering #152

Open daveaitel opened 2 years ago

daveaitel commented 2 years ago

Let's say I had a very large graph of IDs->commited->CommitNodes->modified->FileNodes where my edges are reversable.

Commitnodes have a author_date that is a datetime type in the DGraph schema. (ID's and FileNodes do not).

If I want to select from my graph a subgraph of of all the commitNodes for a particular month, and all the filenodes and IDs that they link to, I'm not sure if I can with the current API. Or can I?

Thanks in advance! -dave

EnricoMi commented 2 years ago

Partially. The best you can get is to only load all committed and modified edges and all node ids with author_date in your date range and then filtering the edges by joining with the node ids. You can't get better than that with the current connector implementation.

Assuming only CommitNodes have author_date, you could do

def read(first: Timestamp, last: Timestamp): Unit = {
  val edges = spark.read.dgraph.edges("localhost:9080")
  val nodes = spark.read.dgraph.nodes("localhost:9080")

  val committedEdges = edges.where($"predicate" === "committed")
  val modifiedEdges = edges.where($"predicate" === "modified")
  val commitNodeIds = nodes.where($"predicate" === "author_date" && $"objectTimestamp".between(first, last))

  Seq(
    committedEdges.join(commitNodeIds, committedEdges("objectUid") === commitNodeIds("subject"), "leftsemi"),
    modifiedEdges.join(commitNodeIds, modifiedEdges("subject") === commitNodeIds("subject"), "leftsemi")
  ).reduce(_.union(_))
}

This assumes that the Left Semi join pushes the "subject" projection down to the nodes DataFrame.

EnricoMi commented 2 years ago

If you are calling that read for many different months, you could also read the entire committed-modified subgraph, enrich it with author date and store the data in partitioned parquet files:

val edges = spark.read.dgraph.edges("localhost:9080")
val nodes = spark.read.dgraph.nodes("localhost:9080")

val committedEdges = edges.where($"predicate" === "committed")
val modifiedEdges = edges.where($"predicate" === "modified")
val authorDates = nodes.where($"predicate" === "author_date").select($"subject".as("node_id"), $"objectTimestamp".as("author_date"))
val authorMonths = authorDates.select($"node_id", date_format($"author_date", "yyyy-MM").as("author_month"))

edgesWithAuthorDate = Seq(
  committedEdges.join(authorMonths.withColumnRenamed("node_id", "objectUid")),
  modifiedEdges.join(authorMonths.withColumnRenamed("node_id", "subject"))
).reduce(_.union(_))

edgesWithAuthorDate.write.partitionBy("author_month").parquet("commit-modified-edges")

Then you can read your sub-graph by calling:

spark.read.parquet("commit-modified-edges").where($"author_month" === "2020-01") 

This will read only relevant edges.

I recommend to use DataFrame.writePartitionedBy provided by our spark-extension package to write sensible partitioned files.

EnricoMi commented 2 years ago

Some thinking about how to make the connector support your use-case (#144). Your request would turn into the following DQL query:

query {
  commitNodes as var(func: between(<author_date>, "2020-01-01T00:00:00Z", "2020-01-31T23:59:59Z"))

  result (func: uid(commitNodes), first: 100, offset: 0) {
    <uid>
    <author_date>
    <~commited> { <uid> }
    <modified> { <uid> }
  }
}

Which would be straightforward to turn into triples. As soon as you want to retrieve data from IDs or FileNodes, things become tricky as this can introduce duplicate triples. Also the , first: 100, offset: 0 would need to be injected to add partitioning. It looks like only a very limited subset of DQL can potentially be supported by this connector.

daveaitel commented 2 years ago

What I REALLY am aiming towards, in this case, is some sort of community detection - although probably one I have to hand-build because LPA as built into SPARK isn't working for me. So for example, assuming the structure above, figuring out who the "main authors/committers" of a "group of files" becomes possible. In theory you can assign weights based on how many times you see another file in the same Commit as you are in, for example. The months here are any arbitrary time period so you can track how things change over time, if that makes sense. :)

Thanks!

On Mon, Nov 15, 2021 at 10:02 AM Enrico Minack @.***> wrote:

Some thinking about how to make the connector support your use-case (#144 https://github.com/G-Research/spark-dgraph-connector/issues/144). Your request would turn into the following DQL query:

query { commitNodes as var(func: between(, "2020-01-01T00:00:00Z", "2020-01-31T23:59:59Z"))

result (func: uid(commitNodes), first: 100, offset: 0) {

<~commited> { } { } } } Which would be straightforward to turn into triples. As soon as you want to retrieve data from IDs or FileNodes, things become tricky as this can introduce duplicate triples. Also the , first: 100, offset: 0 would need to be injected to add partitioning. It looks like only a very limited subset of DQL can potentially be supported by this connector. — You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub , or unsubscribe . Triage notifications on the go with GitHub Mobile for iOS or Android .
daveaitel commented 2 years ago

authorDates = nodes.filter("predicate = 'author_date'").selectExpr("subject as node_id", "objectTimestamp as author_date") authorDates.limit(10).show(10) +-------+-------------------+ |node_id| author_date| +-------+-------------------+ |7195639|2020-01-09 23:56:17| |7195640|2020-01-09 23:56:20| |7195641|2020-01-09 23:56:19| |7195642|2020-01-09 23:56:18| |7195647|2020-01-09 23:57:38| |7195660|2020-01-10 00:08:16| |7195664|2020-01-10 00:11:23| |7195667|2020-01-10 00:11:33| |7195668|2020-01-10 00:11:38| |7195676|2020-01-10 00:13:33| +-------+-------------------+

authorDates.count() 1743986

Step one complete (in pySpark which is a lot more legible to me - and perhaps this code will help some other poor soul doing something similar) :)

On Mon, Nov 15, 2021 at 9:58 AM Enrico Minack @.***> wrote:

If you are calling that read for many different months, you could also read the entire committed-modified subgraph, enrich it with author date and store the data in partitioned parquet files:

val edges = spark.read.dgraph.edges("localhost:9080") val nodes = spark.read.dgraph.nodes("localhost:9080")

val committedEdges = edges.where($"predicate" === "committed") val modifiedEdges = edges.where($"predicate" === "modified") val authorDates = nodes.where($"predicate" === "author_date").select($"subject".as("node_id"), $"objectTimestamp".as("author_date")) val authorMonths = authorDates.select($"node_id", date_format($"author_date", "yyyy-MM").as("author_month"))

edgesWithAuthorDate = Seq( committedEdges.join(authorMonths.withColumnRenamed("node_id", "objectUid")), modifiedEdges.join(authorMonths.withColumnRenamed("nodeid", "subject")) ).reduce(.union(_))

edgesWithAuthorDate.write.partitionBy("author_month").parquet("commit-modified-edges")

Then you can read your sub-graph by calling:

spark.read.parquet("commit-modified-edges").where($"author_month" === "2020-01")

This will read only relevant edges.

I recommend to use DataFrame.writePartitionedBy provided by our spark-extension package https://github.com/G-Research/spark-extension to write sensible partitioned files https://github.com/G-Research/spark-extension/blob/master/PARTITIONING.md#partitioned-writing .

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/G-Research/spark-dgraph-connector/issues/152#issuecomment-968993152, or unsubscribe https://github.com/notifications/unsubscribe-auth/AE25MYSMHHEJ6R66NUYRAJTUMENY5ANCNFSM5IBSA2CQ . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub.

daveaitel commented 2 years ago

Ok, I ran into an exception (which is weird because the DGraph is still up and happy). It's possible I'm mis-translating one of your lines, which I think is just joining and uniquing the RDD.

from gresearch.spark.dgraph.connector import * from pyspark.sql import functions as F

edges: DataFrame = spark.read.option("dgraph.chunksize", 300).dgraph.edges("localhost:9080") nodes: DataFrame = spark.read.option("dgraph.chunksize", 300).dgraph.nodes("localhost:9080") authorDates = nodes.filter("predicate = 'author_date'").selectExpr("subject as node_id", "objectTimestamp as author_date") authorMonths = authorDates.select("node_id", F.date_format("author_date", "yyyy-MM").alias("date_month")) committedEdges = edges.where("predicate = 'committed'") modifiedEdges = edges.where("predicate = 'modified'") committedEdges.join(authorMonths.withColumnRenamed("node_id", "objectUid")).show(10)

committedEdges.join(authorMonths.withColumnRenamed("node_id", "objectUid")).show(10) 21/11/19 17:03:28 ERROR Executor: Exception in task 64.0 in stage 15.0 (TID 155) java.lang.RuntimeException: java.util.concurrent.CompletionException: java.lang.RuntimeException: The doRequest encountered an execution exception: at uk.co.gresearch.spark.dgraph.connector.executor.DgraphExecutor.query(DgraphExecutor.scala:57) at uk.co.gresearch.spark.dgraph.connector.executor.DgraphExecutor.query(DgraphExecutor.scala:30) at uk.co.gresearch.spark.dgraph.connector.model.GraphTableModel.readChunk(GraphTableModel.scala:93) at uk.co.gresearch.spark.dgraph.connector.model.GraphTableModel.readChunk$(GraphTableModel.scala:85) at uk.co.gresearch.spark.dgraph.connector.model.NodeTableModel.readChunk(NodeTableModel.scala:26) at uk.co.gresearch.spark.dgraph.connector.model.GraphTableModel.$anonfun$modelPartition$4(GraphTableModel.scala:81) at uk.co.gresearch.spark.dgraph.connector.model.ChunkIterator.next(ChunkIterator.scala:46) at uk.co.gresearch.spark.dgraph.connector.model.ChunkIterator.next(ChunkIterator.scala:23) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490) at uk.co.gresearch.spark.dgraph.connector.TriplePartitionReader.next(TriplePartitionReader.scala:28) at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79) at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.foreach(WholeStageCodegenExec.scala:753) at org.apache.spark.sql.execution.joins.UnsafeCartesianRDD.compute(CartesianProductExec.scala:47) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: The doRequest encountered an execution exception: at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1692) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) Caused by: java.lang.RuntimeException: The doRequest encountered an execution exception: at uk.co.gresearch.thirdparty.io.dgraph.DgraphAsyncClient.lambda$runWithRetries$2(DgraphAsyncClient.java:248) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ... 6 more Caused by: java.util.concurrent.ExecutionException: uk.co.gresearch.thirdparty.io.grpc.StatusRuntimeException: UNAVAILABLE: io exception at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) at uk.co.gresearch.thirdparty.io.dgraph.DgraphAsyncClient.lambda$runWithRetries$2(DgraphAsyncClient.java:216) ... 7 more Caused by: uk.co.gresearch.thirdparty.io.grpc.StatusRuntimeException: UNAVAILABLE: io exception at uk.co.gresearch.thirdparty.io.grpc.Status.asRuntimeException(Status.java:533) at uk.co.gresearch.thirdparty.io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:478) at uk.co.gresearch.thirdparty.io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:464) at uk.co.gresearch.thirdparty.io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:428) at uk.co.gresearch.thirdparty.io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:461) at uk.co.gresearch.thirdparty.io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:617) at uk.co.gresearch.thirdparty.io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) at uk.co.gresearch.thirdparty.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:803) at uk.co.gresearch.thirdparty.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:782) at uk.co.gresearch.thirdparty.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at uk.co.gresearch.thirdparty.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer

On Mon, Nov 15, 2021 at 9:58 AM Enrico Minack @.***> wrote:

If you are calling that read for many different months, you could also read the entire committed-modified subgraph, enrich it with author date and store the data in partitioned parquet files:

val edges = spark.read.dgraph.edges("localhost:9080") val nodes = spark.read.dgraph.nodes("localhost:9080")

val committedEdges = edges.where($"predicate" === "committed") val modifiedEdges = edges.where($"predicate" === "modified") val authorDates = nodes.where($"predicate" === "author_date").select($"subject".as("node_id"), $"objectTimestamp".as("author_date")) val authorMonths = authorDates.select($"node_id", date_format($"author_date", "yyyy-MM").as("author_month"))

edgesWithAuthorDate = Seq( committedEdges.join(authorMonths.withColumnRenamed("node_id", "objectUid")), modifiedEdges.join(authorMonths.withColumnRenamed("nodeid", "subject")) ).reduce(.union(_))

edgesWithAuthorDate.write.partitionBy("author_month").parquet("commit-modified-edges")

Then you can read your sub-graph by calling:

spark.read.parquet("commit-modified-edges").where($"author_month" === "2020-01")

This will read only relevant edges.

I recommend to use DataFrame.writePartitionedBy provided by our spark-extension package https://github.com/G-Research/spark-extension to write sensible partitioned files https://github.com/G-Research/spark-extension/blob/master/PARTITIONING.md#partitioned-writing .

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/G-Research/spark-dgraph-connector/issues/152#issuecomment-968993152, or unsubscribe https://github.com/notifications/unsubscribe-auth/AE25MYSMHHEJ6R66NUYRAJTUMENY5ANCNFSM5IBSA2CQ . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub.

EnricoMi commented 2 years ago

The io.grpc.StatusRuntimeException: UNAVAILABLE is 100% DGraph, so up to this point your PySpark is correct. This looks like the Dgraph instance is down. You should check the output of the alpha and zero nodes. And check it is still working with Ratel / https://play.dgraph.io.

daveaitel commented 2 years ago

I set my chunksize down to 200 and 100 to try again....and the same thing happened. As another datapoint: There are no errors from my Dgraph and Ratel is happy as well. I forget how to enable debugging though, which in theory would provide more data?

I'm just using the single-instance Docker image that has one Alpha and Zero node on it. But I'm not sure why this would cause a connection reset issue? It's weird because pagecount worked fine.

OH. VERY STRANGE. chunksize of 500 works. chunksize of 300 does not. Is it possible with a lower chunksize we run out of file handles or something?!? (or is there some other error?)

edges: DataFrame = spark.read.option("dgraph.chunksize", 500).dgraph.edges("localhost:9080") nodes: DataFrame = spark.read.option("dgraph.chunksize", 500).dgraph.nodes("localhost:9080") authorDates = nodes.filter("predicate = 'author_date'").selectExpr("subject as node_id", "objectTimestamp as author_date") authorMonths = authorDates.select("node_id", F.date_format("author_date", "yyyy-MM").alias("date_month")) committedEdges = edges.where("predicate = 'committed'") modifiedEdges = edges.where("predicate = 'modified'") committedEdges.join(authorMonths.withColumnRenamed("node_id", "objectUid")).show(10) +-------+---------+---------+---------+----------+ |subject|predicate|objectUid|objectUid|date_month| +-------+---------+---------+---------+----------+ |7195635|committed| 8392773| 7195639| 2020-01| |7195635|committed| 8392773| 7195640| 2020-01| |7195635|committed| 8392773| 7195641| 2020-01| |7195635|committed| 8392773| 7195642| 2020-01| |7195635|committed| 8392773| 7195647| 2020-01| |7195635|committed| 8392773| 7195660| 2020-01| |7195635|committed| 8392773| 7195664| 2020-01| |7195635|committed| 8392773| 7195667| 2020-01| |7195635|committed| 8392773| 7195668| 2020-01| |7195635|committed| 8392773| 7195676| 2020-01| +-------+---------+---------+---------+----------+ only showing top 10 rows Thanks!

On Fri, Nov 19, 2021 at 3:18 PM Enrico Minack @.***> wrote:

Have you set your chunk size small enough?

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/G-Research/spark-dgraph-connector/issues/152#issuecomment-974409868, or unsubscribe https://github.com/notifications/unsubscribe-auth/AE25MYV6VGXMAHUCBN6ZDXLUM2WIHANCNFSM5IBSA2CQ . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub.

EnricoMi commented 1 year ago

Sorry for the late reply. Hope this background info is still relevant.

Smaller chunk sizes mean more partitions. With a large Spark cluster, this means more concurrent reads hitting the Dgraph cluster, which might cause the unresponsiveness.