SmartDataAnalytics / RdfProcessingToolkit

Command line interface based RDF processing toolkit to run sequences of SPARQL statements ad-hoc on RDF datasets, streams of bindings and streams of named graphs with support for processing JSON, CSV and XML using function extensions
https://smartdataanalytics.github.io/RdfProcessingToolkit/
Other
39 stars 3 forks source link

Sansa - KryoException: Buffer overflow #40

Closed TBoonX closed 1 year ago

TBoonX commented 1 year ago

I started with:

sansa query mapping.rq

The mapping file is simple but the linked csv is simple and long (440MB). The first minutes I had jobs using all my cores, then up to two jobs just used my cores for a while and then after 10 minutes or so the error was thrown. Here is the end of the log:

3:12:48 [INFO] [o.a.s.s.DAGScheduler:61] - Missing parents: List()
13:12:48 [INFO] [o.a.s.s.DAGScheduler:61] - Submitting ResultStage 1522 (MapPartitionsRDD[243] at mapPartitions at JavaRddOfBindingsOps.java:144), which has no missing parents
13:12:48 [INFO] [o.a.s.s.m.MemoryStore:61] - Block broadcast_172 stored as values in memory (estimated size 40.0 KiB, free 9.1 GiB)
13:12:48 [INFO] [o.a.s.s.m.MemoryStore:61] - Block broadcast_172_piece0 stored as bytes in memory (estimated size 9.5 KiB, free 9.1 GiB)
13:12:48 [INFO] [o.a.s.s.BlockManagerInfo:61] - Added broadcast_172_piece0 in memory on Pulsar-5047.lan:41025 (size: 9.5 KiB, free: 9.2 GiB)
13:12:48 [INFO] [o.a.s.SparkContext:61] - Created broadcast 172 from broadcast at DAGScheduler.scala:1513
13:12:48 [INFO] [o.a.s.s.DAGScheduler:61] - Submitting 1 missing tasks from ResultStage 1522 (MapPartitionsRDD[243] at mapPartitions at JavaRddOfBindingsOps.java:144) (first 15 tasks are for partitions Vector(75))
13:12:48 [INFO] [o.a.s.s.TaskSchedulerImpl:61] - Adding task set 1522.0 with 1 tasks resource profile 0
13:12:48 [INFO] [o.a.s.s.TaskSetManager:61] - Starting task 0.0 in stage 1522.0 (TID 279) (Pulsar-5047.lan, executor driver, partition 75, NODE_LOCAL, 4380 bytes) taskResourceAssignments Map()
13:12:48 [INFO] [o.a.s.e.Executor:61] - Running task 0.0 in stage 1522.0 (TID 279)
13:12:48 [INFO] [o.a.s.s.ShuffleBlockFetcherIterator:61] - Getting 14 (11.9 MiB) non-empty blocks including 14 (11.9 MiB) local and 0 (0.0 B) host-local and 0 (0.0 B) push-merged-local and 0 (0.0 B) remote blocks
13:12:48 [INFO] [o.a.s.s.ShuffleBlockFetcherIterator:61] - Started 0 remote fetches in 0 ms
13:12:52 [ERROR] [o.a.s.e.Executor:98] - Exception in task 0.0 in stage 1522.0 (TID 279)
org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 1. To avoid this, increase spark.kryoserializer.buffer.max value.
    at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:392) ~[rpt.jar:?]
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:593) [rpt.jar:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 1
    at com.esotericsoftware.kryo.io.Output.require(Output.java:167) ~[rpt.jar:?]
    at com.esotericsoftware.kryo.io.Output.writeVarInt(Output.java:284) ~[rpt.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:682) ~[rpt.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:646) ~[rpt.jar:?]
    at org.aksw.jenax.io.kryo.jena.QuadSerializer.write(QuadSerializer.java:20) ~[rpt.jar:?]
    at org.aksw.jenax.io.kryo.jena.QuadSerializer.write(QuadSerializer.java:16) ~[rpt.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) ~[rpt.jar:?]
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:361) ~[rpt.jar:?]
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:302) ~[rpt.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) ~[rpt.jar:?]
    at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:388) ~[rpt.jar:?]
    ... 4 more
13:12:52 [WARN] [o.a.s.s.TaskSetManager:73] - Lost task 0.0 in stage 1522.0 (TID 279) (Pulsar-5047.lan executor driver): org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 1. To avoid this, increase spark.kryoserializer.buffer.max value.
    at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:392)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:593)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 1
    at com.esotericsoftware.kryo.io.Output.require(Output.java:167)
    at com.esotericsoftware.kryo.io.Output.writeVarInt(Output.java:284)
    at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:682)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:646)
    at org.aksw.jenax.io.kryo.jena.QuadSerializer.write(QuadSerializer.java:20)
    at org.aksw.jenax.io.kryo.jena.QuadSerializer.write(QuadSerializer.java:16)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:361)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:302)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
    at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:388)
    ... 4 more

13:12:52 [ERROR] [o.a.s.s.TaskSetManager:77] - Task 0 in stage 1522.0 failed 1 times; aborting job
13:12:52 [INFO] [o.a.s.s.TaskSchedulerImpl:61] - Removed TaskSet 1522.0, whose tasks have all completed, from pool 
13:12:52 [INFO] [o.a.s.s.TaskSchedulerImpl:61] - Cancelling stage 1522
13:12:52 [INFO] [o.a.s.s.TaskSchedulerImpl:61] - Killing all running tasks in stage 1522: Stage cancelled
13:12:52 [INFO] [o.a.s.s.DAGScheduler:61] - ResultStage 1522 (hasNext at Iterator.java:132) failed in 4.145 s due to Job aborted due to stage failure: Task 0 in stage 1522.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1522.0 (TID 279) (Pulsar-5047.lan executor driver): org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 1. To avoid this, increase spark.kryoserializer.buffer.max value.
    at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:392)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:593)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 1
    at com.esotericsoftware.kryo.io.Output.require(Output.java:167)
    at com.esotericsoftware.kryo.io.Output.writeVarInt(Output.java:284)
    at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:682)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:646)
    at org.aksw.jenax.io.kryo.jena.QuadSerializer.write(QuadSerializer.java:20)
    at org.aksw.jenax.io.kryo.jena.QuadSerializer.write(QuadSerializer.java:16)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:361)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:302)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
    at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:388)
    ... 4 more

Driver stacktrace:
13:12:52 [INFO] [o.a.s.s.DAGScheduler:61] - Job 78 failed: hasNext at Iterator.java:132, took 4.147336 s
java.lang.RuntimeException: java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1522.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1522.0 (TID 279) (Pulsar-5047.lan executor driver): org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 1. To avoid this, increase spark.kryoserializer.buffer.max value.
    at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:392)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:593)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 1
    at com.esotericsoftware.kryo.io.Output.require(Output.java:167)
    at com.esotericsoftware.kryo.io.Output.writeVarInt(Output.java:284)
    at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:682)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:646)
    at org.aksw.jenax.io.kryo.jena.QuadSerializer.write(QuadSerializer.java:20)
    at org.aksw.jenax.io.kryo.jena.QuadSerializer.write(QuadSerializer.java:16)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:361)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:302)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
    at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:388)
    ... 4 more

Driver stacktrace:
    at org.aksw.commons.util.exception.ExceptionUtilsAksw.rethrowUnless(ExceptionUtilsAksw.java:40)
    at org.aksw.commons.util.exception.ExceptionUtilsAksw.rethrowIfNotBrokenPipe(ExceptionUtilsAksw.java:88)
    at org.aksw.rdf_processing_toolkit.cli.cmd.CmdUtils.lambda$callCmd$0(CmdUtils.java:70)
    at picocli.CommandLine.execute(CommandLine.java:2088)
    at org.aksw.rdf_processing_toolkit.cli.cmd.CmdUtils.callCmd(CmdUtils.java:77)
    at org.aksw.rdf_processing_toolkit.cli.cmd.CmdUtils.callCmd(CmdUtils.java:40)
    at org.aksw.rdf_processing_toolkit.cli.cmd.CmdUtils.execCmd(CmdUtils.java:21)
    at org.aksw.rdf_processing_toolkit.cli.main.MainCliRdfProcessingToolkit.main(MainCliRdfProcessingToolkit.java:9)
Caused by: java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1522.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1522.0 (TID 279) (Pulsar-5047.lan executor driver): org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 1. To avoid this, increase spark.kryoserializer.buffer.max value.
    at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:392)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:593)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 1
    at com.esotericsoftware.kryo.io.Output.require(Output.java:167)
    at com.esotericsoftware.kryo.io.Output.writeVarInt(Output.java:284)
    at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:682)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:646)
    at org.aksw.jenax.io.kryo.jena.QuadSerializer.write(QuadSerializer.java:20)
    at org.aksw.jenax.io.kryo.jena.QuadSerializer.write(QuadSerializer.java:16)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:361)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:302)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
    at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:388)
    ... 4 more

Driver stacktrace:
    at net.sansa_stack.spark.io.rdf.output.RddRdfWriter.runUnchecked(RddRdfWriter.java:130)
    at net.sansa_stack.spark.cli.impl.CmdSansaMapImpl.writeOutRdfSources(CmdSansaMapImpl.java:96)
    at net.sansa_stack.spark.cli.impl.CmdSansaQueryImpl.run(CmdSansaQueryImpl.java:126)
    at net.sansa_stack.spark.cli.cmd.CmdSansaQuery.call(CmdSansaQuery.java:45)
    at net.sansa_stack.spark.cli.cmd.CmdSansaQuery.call(CmdSansaQuery.java:13)
    at picocli.CommandLine.executeUserObject(CommandLine.java:1953)
    at picocli.CommandLine.access$1300(CommandLine.java:145)
    at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2358)
    at picocli.CommandLine$RunLast.handle(CommandLine.java:2352)
    at picocli.CommandLine$RunLast.handle(CommandLine.java:2314)
    at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2179)
    at picocli.CommandLine$RunLast.execute(CommandLine.java:2316)
    at picocli.CommandLine.execute(CommandLine.java:2078)
    ... 4 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1522.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1522.0 (TID 279) (Pulsar-5047.lan executor driver): org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 1. To avoid this, increase spark.kryoserializer.buffer.max value.
    at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:392)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:593)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 1
    at com.esotericsoftware.kryo.io.Output.require(Output.java:167)
    at com.esotericsoftware.kryo.io.Output.writeVarInt(Output.java:284)
    at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:682)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:646)
    at org.aksw.jenax.io.kryo.jena.QuadSerializer.write(QuadSerializer.java:20)
    at org.aksw.jenax.io.kryo.jena.QuadSerializer.write(QuadSerializer.java:16)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:361)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:302)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
    at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:388)
    ... 4 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
    at org.apache.spark.rdd.RDD.collectPartition$1(RDD.scala:1036)
    at org.apache.spark.rdd.RDD.$anonfun$toLocalIterator$3(RDD.scala:1038)
    at org.apache.spark.rdd.RDD.$anonfun$toLocalIterator$3$adapted(RDD.scala:1038)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:32)
    at java.base/java.util.Iterator.forEachRemaining(Iterator.java:132)
    at scala.collection.convert.Wrappers$IteratorWrapper.forEachRemaining(Wrappers.scala:31)
    at net.sansa_stack.spark.io.rdf.output.RddRdfWriter.runOutputToConsole(RddRdfWriter.java:206)
    at net.sansa_stack.spark.io.rdf.output.RddRdfWriter.run(RddRdfWriter.java:137)
    at net.sansa_stack.spark.io.rdf.output.RddRdfWriter.runUnchecked(RddRdfWriter.java:128)
    ... 16 more
Caused by: org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 1. To avoid this, increase spark.kryoserializer.buffer.max value.
    at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:392)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:593)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 1
    at com.esotericsoftware.kryo.io.Output.require(Output.java:167)
    at com.esotericsoftware.kryo.io.Output.writeVarInt(Output.java:284)
    at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:682)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:646)
    at org.aksw.jenax.io.kryo.jena.QuadSerializer.write(QuadSerializer.java:20)
    at org.aksw.jenax.io.kryo.jena.QuadSerializer.write(QuadSerializer.java:16)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:361)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:302)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
    at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:388)
    ... 4 more
13:12:52 [INFO] [o.a.s.SparkContext:61] - Invoking stop() from shutdown hook
13:12:52 [INFO] [o.s.j.s.AbstractConnector:383] - Stopped Spark@396e5758{HTTP/1.1, (http/1.1)}{0.0.0.0:4040}
13:12:52 [INFO] [o.a.s.u.SparkUI:61] - Stopped Spark web UI at http://Pulsar-5047.lan:4040
13:12:52 [INFO] [o.a.s.MapOutputTrackerMasterEndpoint:61] - MapOutputTrackerMasterEndpoint stopped!
13:12:52 [INFO] [o.a.s.s.m.MemoryStore:61] - MemoryStore cleared
13:12:52 [INFO] [o.a.s.s.BlockManager:61] - BlockManager stopped
13:12:52 [INFO] [o.a.s.s.BlockManagerMaster:61] - BlockManagerMaster stopped
13:12:52 [INFO] [o.a.s.s.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:61] - OutputCommitCoordinator stopped!
13:12:52 [INFO] [o.a.s.SparkContext:61] - Successfully stopped SparkContext
13:12:52 [INFO] [o.a.s.u.ShutdownHookManager:61] - Shutdown hook called
13:12:52 [INFO] [o.a.s.u.ShutdownHookManager:61] - Deleting directory /tmp/spark-c4c495f7-e3c7-4dbb-ba74-b222d203e83e

I have 64GB RAM Which were at 60% usage and 16 logical cores.

Aklakan commented 1 year ago

Can you run

rpt sansa analyze csv your-data.csv --out-file report.ttl

(See also https://sansa-stack.github.io/SANSA-Stack/cli/tarql.html#inspecting-csv-files)

and check whether it can parse the CSV file correctly? It should output an RDF document with parsing information about each split of the CSV file.

TBoonX commented 1 year ago

Output:

@prefix eg: <http://www.example.org/> .
@prefix xds: <http://www.w3.org/2001/XMLSchema#> .

_:b0    eg:totalDuration  "0.012"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "81"^^xds:long .

_:b1    eg:regionEndProbeResult    _:b0 ;
        eg:totalElementCount       "860636"^^xds:long ;
        eg:totalBytesRead          "33554513"^^xds:long ;
        eg:totalTime               "0.7905820520000001"^^xds:double ;
        eg:splitStart              "0"^^xds:long ;
        eg:tailElementCount        "1"^^xds:int ;
        eg:regionStartProbeResult  _:b2 ;
        eg:splitSize               "33554432"^^xds:long ;
        eg:regionStartSearchReadOverSplitEnd  false ;
        eg:totalRecordCount        "860637"^^xds:long ;
        eg:regionStartSearchReadOverRegionEnd  false .

_:b2    eg:totalDuration  "0.0"^^xds:double ;
        eg:probeCount     "0"^^xds:long ;
        eg:candidatePos   "0"^^xds:long .

_:b3    eg:regionEndProbeResult    _:b4 ;
        eg:regionStartSearchReadOverSplitEnd  false ;
        eg:regionStartSearchReadOverRegionEnd  false ;
        eg:tailElementCount        "1"^^xds:int ;
        eg:totalBytesRead          "33554469"^^xds:long ;
        eg:splitStart              "33554432"^^xds:long ;
        eg:totalRecordCount        "816606"^^xds:long ;
        eg:totalElementCount       "816606"^^xds:long ;
        eg:totalTime               "0.41145543700000003"^^xds:double ;
        eg:splitSize               "33554432"^^xds:long ;
        eg:regionStartProbeResult  _:b5 .

_:b4    eg:totalDuration  "0.005"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "37"^^xds:long .

_:b5    eg:totalDuration  "0.007"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "81"^^xds:long .

_:b6    eg:totalDuration  "0.005"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "45"^^xds:long .

_:b7    eg:totalDuration  "0.008"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "37"^^xds:long .

_:b8    eg:totalTime               "0.423793693"^^xds:double ;
        eg:totalRecordCount        "799251"^^xds:long ;
        eg:tailElementCount        "1"^^xds:int ;
        eg:totalElementCount       "799251"^^xds:long ;
        eg:splitSize               "33554432"^^xds:long ;
        eg:splitStart              "67108864"^^xds:long ;
        eg:totalBytesRead          "33554477"^^xds:long ;
        eg:regionStartSearchReadOverSplitEnd  false ;
        eg:regionStartSearchReadOverRegionEnd  false ;
        eg:regionStartProbeResult  _:b7 ;
        eg:regionEndProbeResult    _:b6 .

_:b9    eg:totalDuration  "0.005"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "45"^^xds:long .

_:b10   eg:regionStartProbeResult  _:b9 ;
        eg:totalTime               "0.34524198"^^xds:double ;
        eg:tailElementCount        "1"^^xds:int ;
        eg:totalRecordCount        "810578"^^xds:long ;
        eg:totalElementCount       "810578"^^xds:long ;
        eg:splitStart              "100663296"^^xds:long ;
        eg:regionStartSearchReadOverSplitEnd  false ;
        eg:splitSize               "33554432"^^xds:long ;
        eg:totalBytesRead          "33554513"^^xds:long ;
        eg:regionEndProbeResult    _:b11 ;
        eg:regionStartSearchReadOverRegionEnd  false .

_:b11   eg:totalDuration  "0.005"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "81"^^xds:long .

_:b12   eg:totalDuration  "0.005"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "81"^^xds:long .

_:b13   eg:totalDuration  "0.005"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "81"^^xds:long .

_:b14   eg:splitStart              "134217728"^^xds:long ;
        eg:totalBytesRead          "33554513"^^xds:long ;
        eg:regionEndProbeResult    _:b12 ;
        eg:totalTime               "0.33095979400000003"^^xds:double ;
        eg:regionStartSearchReadOverSplitEnd  false ;
        eg:totalRecordCount        "803622"^^xds:long ;
        eg:tailElementCount        "1"^^xds:int ;
        eg:splitSize               "33554432"^^xds:long ;
        eg:totalElementCount       "803622"^^xds:long ;
        eg:regionStartProbeResult  _:b13 ;
        eg:regionStartSearchReadOverRegionEnd  false .

_:b15   eg:splitStart              "167772160"^^xds:long ;
        eg:regionStartSearchReadOverSplitEnd  false ;
        eg:regionStartProbeResult  _:b16 ;
        eg:regionEndProbeResult    _:b17 ;
        eg:tailElementCount        "1"^^xds:int ;
        eg:splitSize               "33554432"^^xds:long ;
        eg:totalTime               "0.35778299"^^xds:double ;
        eg:totalRecordCount        "804767"^^xds:long ;
        eg:totalBytesRead          "33554497"^^xds:long ;
        eg:regionStartSearchReadOverRegionEnd  false ;
        eg:totalElementCount       "804767"^^xds:long .

_:b16   eg:totalDuration  "0.005"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "81"^^xds:long .

_:b17   eg:totalDuration  "0.004"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "65"^^xds:long .

_:b18   eg:totalDuration  "0.005"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "65"^^xds:long .

_:b19   eg:regionStartSearchReadOverSplitEnd  false ;
        eg:splitStart              "201326592"^^xds:long ;
        eg:totalBytesRead          "33554507"^^xds:long ;
        eg:regionEndProbeResult    _:b20 ;
        eg:splitSize               "33554432"^^xds:long ;
        eg:totalRecordCount        "812414"^^xds:long ;
        eg:tailElementCount        "1"^^xds:int ;
        eg:regionStartSearchReadOverRegionEnd  false ;
        eg:totalElementCount       "812414"^^xds:long ;
        eg:totalTime               "0.334013382"^^xds:double ;
        eg:regionStartProbeResult  _:b18 .

_:b20   eg:totalDuration  "0.005"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "75"^^xds:long .

_:b21   eg:totalDuration  "0.004"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "82"^^xds:long .

_:b22   eg:totalRecordCount        "809813"^^xds:long ;
        eg:splitStart              "234881024"^^xds:long ;
        eg:regionStartProbeResult  _:b23 ;
        eg:totalBytesRead          "33554514"^^xds:long ;
        eg:totalTime               "0.361883591"^^xds:double ;
        eg:regionEndProbeResult    _:b21 ;
        eg:regionStartSearchReadOverRegionEnd  false ;
        eg:totalElementCount       "809813"^^xds:long ;
        eg:regionStartSearchReadOverSplitEnd  false ;
        eg:splitSize               "33554432"^^xds:long ;
        eg:tailElementCount        "1"^^xds:int .

_:b23   eg:totalDuration  "0.005"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "75"^^xds:long .

_:b24   eg:totalDuration  "0.005"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "82"^^xds:long .

_:b25   eg:totalElementCount       "814884"^^xds:long ;
        eg:splitStart              "268435456"^^xds:long ;
        eg:splitSize               "33554432"^^xds:long ;
        eg:tailElementCount        "1"^^xds:int ;
        eg:totalTime               "0.34096873"^^xds:double ;
        eg:totalBytesRead          "33554494"^^xds:long ;
        eg:regionStartSearchReadOverSplitEnd  false ;
        eg:regionStartProbeResult  _:b24 ;
        eg:regionStartSearchReadOverRegionEnd  false ;
        eg:totalRecordCount        "814884"^^xds:long ;
        eg:regionEndProbeResult    _:b26 .

_:b26   eg:totalDuration  "0.004"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "62"^^xds:long .

_:b27   eg:totalDuration  "0.004"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "70"^^xds:long .

_:b28   eg:totalDuration  "0.004"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "62"^^xds:long .

_:b29   eg:regionStartProbeResult  _:b28 ;
        eg:splitStart              "301989888"^^xds:long ;
        eg:regionStartSearchReadOverSplitEnd  false ;
        eg:tailElementCount        "1"^^xds:int ;
        eg:splitSize               "33554432"^^xds:long ;
        eg:regionStartSearchReadOverRegionEnd  false ;
        eg:totalElementCount       "812127"^^xds:long ;
        eg:totalRecordCount        "812127"^^xds:long ;
        eg:totalTime               "0.34634130300000004"^^xds:double ;
        eg:totalBytesRead          "33554502"^^xds:long ;
        eg:regionEndProbeResult    _:b27 .

_:b30   eg:totalDuration  "0.005"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "70"^^xds:long .

_:b31   eg:regionStartSearchReadOverRegionEnd  false ;
        eg:tailElementCount        "1"^^xds:int ;
        eg:totalTime               "0.33287019900000003"^^xds:double ;
        eg:regionStartProbeResult  _:b30 ;
        eg:splitStart              "335544320"^^xds:long ;
        eg:totalRecordCount        "809327"^^xds:long ;
        eg:splitSize               "33554432"^^xds:long ;
        eg:totalBytesRead          "33554491"^^xds:long ;
        eg:regionStartSearchReadOverSplitEnd  false ;
        eg:totalElementCount       "809327"^^xds:long ;
        eg:regionEndProbeResult    _:b32 .

_:b32   eg:totalDuration  "0.004"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "59"^^xds:long .

_:b33   eg:totalTime               "0.345964153"^^xds:double ;
        eg:regionStartSearchReadOverRegionEnd  false ;
        eg:totalElementCount       "811930"^^xds:long ;
        eg:regionEndProbeResult    _:b34 ;
        eg:tailElementCount        "1"^^xds:int ;
        eg:splitStart              "369098752"^^xds:long ;
        eg:regionStartSearchReadOverSplitEnd  false ;
        eg:totalBytesRead          "33554494"^^xds:long ;
        eg:splitSize               "33554432"^^xds:long ;
        eg:totalRecordCount        "811930"^^xds:long ;
        eg:regionStartProbeResult  _:b35 .

_:b35   eg:totalDuration  "0.005"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "59"^^xds:long .

_:b34   eg:totalDuration  "0.004"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "62"^^xds:long .

_:b36   eg:totalTime               "0.32924473800000004"^^xds:double ;
        eg:regionStartSearchReadOverSplitEnd  false ;
        eg:regionEndProbeResult    _:b37 ;
        eg:tailElementCount        "1"^^xds:int ;
        eg:splitSize               "33554432"^^xds:long ;
        eg:regionStartProbeResult  _:b38 ;
        eg:totalElementCount       "797831"^^xds:long ;
        eg:regionStartSearchReadOverRegionEnd  false ;
        eg:totalRecordCount        "797831"^^xds:long ;
        eg:totalBytesRead          "33554493"^^xds:long ;
        eg:splitStart              "402653184"^^xds:long .

_:b37   eg:totalDuration  "0.004"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "61"^^xds:long .

_:b38   eg:totalDuration  "0.004"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "62"^^xds:long .

_:b39   eg:totalDuration  "0.061000000000000006"^^xds:double ;
        eg:probeCount     "0"^^xds:long ;
        eg:candidatePos   "-1"^^xds:long .

_:b40   eg:splitStart              "436207616"^^xds:long ;
        eg:totalElementCount       "570104"^^xds:long ;
        eg:regionStartSearchReadOverRegionEnd  false ;
        eg:totalRecordCount        "570103"^^xds:long ;
        eg:regionStartSearchReadOverSplitEnd  false ;
        eg:regionStartProbeResult  _:b41 ;
        eg:splitSize               "24421635"^^xds:long ;
        eg:totalBytesRead          "24421635"^^xds:long ;
        eg:regionEndProbeResult    _:b39 ;
        eg:tailElementCount        "0"^^xds:int ;
        eg:totalTime               "0.30285072300000004"^^xds:double .

_:b41   eg:totalDuration  "0.005"^^xds:double ;
        eg:probeCount     "1"^^xds:long ;
        eg:candidatePos   "61"^^xds:long .
Aklakan commented 1 year ago

Hm, so the CSV parsing looks ok - what happens when you increase the kryo size?

EXTRA_OPTS="-Dspark.kryoserializer.buffer.max=2000000000" rpt
java -D "-Dspark.kryoserializer.buffer.max=2000000000" -jar
TBoonX commented 1 year ago

Correction:

java -Dspark.kryoserializer.buffer.max=2048 -jar ...

Maximum size of the buffer is <2048MB and the -D parameter has to be used differently.

Aklakan commented 1 year ago

Does increasing kryo buffer size have any effect? Since the CSV parsing seems to work, it would indicate that a single partition of CSV data maps to a very large amounts of RDF data (maybe the mapping produces many duplicates?).

A mapping where several thousands of triples are attached to the same subject (e.g. due to incorrect mapping) might also cause this issue - it was somehow related to very large turtle blocks being formed which exceed internal thresholds.

Maybe switching to ntriples serialization makes the issue go away?

TBoonX commented 1 year ago

It works with the buffer parameter, thanks!

Aklakan commented 1 year ago

Updated Sansa CLI to use kryo's max buffer size of 2048 by default. It is possible to override it to make it smaller, but not sure if there is a good reason to do so.

https://github.com/SANSA-Stack/SANSA-Stack/commit/49483992c978f0f44d777abca91aae3dc2167103 (I realize I should have created a separate issue at sansa but oh well)