filodb / FiloDB

Distributed Prometheus time series database
Apache License 2.0
1.43k stars 225 forks source link

Dataset creation ERROR DatasetCoordinatorActor: #152

Open PallaviPersistent opened 6 years ago

PallaviPersistent commented 6 years ago

Branch, version, commit

OS and Environment Red Hat Enterprise Linux Server release 7.2 (Maipo)

JVM version java -version java version "1.8.0_161" Java(TM) SE Runtime Environment (build 1.8.0_161-b12) Java HotSpot(TM) 64-Bit Server VM (build 25.161-b12, mixed mode)

Scala version Scala version 2.11.8

Kafka and Cassandra versions and setup apache-cassandra-3.11.2

Spark version if used 2.0.0

Deployed mode (client/cluster on Spark Standalone/YARN/Mesos/EMR or default) Spark Standalone

Actual (wrong) behavior parDFnew.write.format("filodb.spark"). option("dataset", "parDFNF"). option("row_keys", "appId"). option("partition_keys", "exportMs"). mode(SaveMode.Overwrite).save()

[Stage 2:> (0 + 2) / 2]18/04/10 02:40:47 ERROR DatasetCoordinatorActor: Error in reprojection task (filodb.parDFNF/0) java.lang.NullPointerException at org.velvia.filo.vectors.UTF8PtrAppendable.addData(UTF8Vector.scala:270) at org.velvia.filo.BinaryAppendableVector$class.addVector(BinaryVector.scala:154) at org.velvia.filo.vectors.ObjectVector.addVector(ObjectVector.scala:16) at org.velvia.filo.GrowableVector.addData(BinaryVector.scala:251) at org.velvia.filo.BinaryVectorBuilder.addData(BinaryVector.scala:308) at org.velvia.filo.VectorBuilderBase$class.add(VectorBuilder.scala:36) at org.velvia.filo.BinaryVectorBuilder.add(BinaryVector.scala:303) at org.velvia.filo.RowToVectorBuilder.addRow(RowToVectorBuilder.scala:70) at filodb.core.store.ChunkSet$$anonfun$2.apply(ChunkSetInfo.scala:53) at filodb.core.store.ChunkSet$$anonfun$2.apply(ChunkSetInfo.scala:52) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) at filodb.core.store.ChunkSet$.apply(ChunkSetInfo.scala:55) at filodb.core.store.ChunkSet$.withSkips(ChunkSetInfo.scala:70) at filodb.core.store.ChunkSetSegment.addChunkSet(Segment.scala:193) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply$mcV$sp(Reprojector.scala:94) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93) at kamon.trace.TraceContext$class.withNewSegment(TraceContext.scala:53) at kamon.trace.MetricsOnlyContext.withNewSegment(MetricsOnlyContext.scala:28) at filodb.core.Perftools$.subtrace(Perftools.scala:26) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:92) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:79) at kamon.trace.Tracer$$anonfun$withNewContext$1.apply(TracerModule.scala:62) at kamon.trace.Tracer$.withContext(TracerModule.scala:53) at kamon.trace.Tracer$.withNewContext(TracerModule.scala:61) at kamon.trace.Tracer$.withNewContext(TracerModule.scala:77) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:79) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:78) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at filodb.core.reprojector.DefaultReprojector.toSegments(Reprojector.scala:78) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:118) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:117) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/04/10 02:40:47 ERROR OneForOneStrategy: java.lang.NullPointerException at org.velvia.filo.vectors.UTF8PtrAppendable.addData(UTF8Vector.scala:270) at org.velvia.filo.BinaryAppendableVector$class.addVector(BinaryVector.scala:154) at org.velvia.filo.vectors.ObjectVector.addVector(ObjectVector.scala:16) at org.velvia.filo.GrowableVector.addData(BinaryVector.scala:251) at org.velvia.filo.BinaryVectorBuilder.addData(BinaryVector.scala:308) at org.velvia.filo.VectorBuilderBase$class.add(VectorBuilder.scala:36) at org.velvia.filo.BinaryVectorBuilder.add(BinaryVector.scala:303) at org.velvia.filo.RowToVectorBuilder.addRow(RowToVectorBuilder.scala:70) at filodb.core.store.ChunkSet$$anonfun$2.apply(ChunkSetInfo.scala:53) at filodb.core.store.ChunkSet$$anonfun$2.apply(ChunkSetInfo.scala:52) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) at filodb.core.store.ChunkSet$.apply(ChunkSetInfo.scala:55) at filodb.core.store.ChunkSet$.withSkips(ChunkSetInfo.scala:70) at filodb.core.store.ChunkSetSegment.addChunkSet(Segment.scala:193) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply$mcV$sp(Reprojector.scala:94) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93) at kamon.trace.TraceContext$class.withNewSegment(TraceContext.scala:53) at kamon.trace.MetricsOnlyContext.withNewSegment(MetricsOnlyContext.scala:28) at filodb.core.Perftools$.subtrace(Perftools.scala:26) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:92) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:79) at kamon.trace.Tracer$$anonfun$withNewContext$1.apply(TracerModule.scala:62) at kamon.trace.Tracer$.withContext(TracerModule.scala:53) at kamon.trace.Tracer$.withNewContext(TracerModule.scala:61) at kamon.trace.Tracer$.withNewContext(TracerModule.scala:77) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:79) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:78) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at filodb.core.reprojector.DefaultReprojector.toSegments(Reprojector.scala:78) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:118) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:117) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/04/10 02:40:47 WARN NodeCoordinatorActor: Actor Actor[akka://filo-spark/user/coordinator/ds-coord-parDFNF-0#-1168010970] has terminated! Ingestion for (filodb.parDFNF,0) will stop. 18/04/10 02:41:07 WARN RddRowSourceActor: ==> (filodb.parDFNF_0_0_1) No Acks received for last 20 seconds 18/04/10 02:41:07 WARN RddRowSourceActor: ==> (filodb.parDFNF_0_1_0) No Acks received for last 20 seconds

Steps to reproduce scala> val files=Seq("/root/FiloDB/FiloDB/parquetfile/fragment1522917434336000000.dat","/root/FiloDB/FiloDB/parquetfile/fragment1522917494312000000.dat") files: Seq[String] = List(/root/FiloDB/FiloDB/parquetfile/fragment1522917434336000000.dat, /root/FiloDB/FiloDB/parquetfile/fragment1522917494312000000.dat)

scala> val parDF=sqlContext.read.parquet(files:_*) SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 18/04/10 02:39:20 WARN General: Plugin (Bundle) "org.datanucleus" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/root/FiloDB/spark-2.0.0-bin-hadoop2.7/jars/datanucleus-core-3.2.10.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/root/FiloDB/spark/jars/datanucleus-core-3.2.10.jar." 18/04/10 02:39:20 WARN General: Plugin (Bundle) "org.datanucleus.store.rdbms" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/root/FiloDB/spark-2.0.0-bin-hadoop2.7/jars/datanucleus-rdbms-3.2.9.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/root/FiloDB/spark/jars/datanucleus-rdbms-3.2.9.jar." 18/04/10 02:39:20 WARN General: Plugin (Bundle) "org.datanucleus.api.jdo" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/root/FiloDB/spark-2.0.0-bin-hadoop2.7/jars/datanucleus-api-jdo-3.2.6.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/root/FiloDB/spark/jars/datanucleus-api-jdo-3.2.6.jar." parDF: org.apache.spark.sql.DataFrame = [epoch: bigint, rowNum: bigint ... 38 more fields]

scala> val parDFnew=parDF.withColumn("exporterIp", 'exporterIp.cast("String")).withColumn("srcIp", 'srcIp.cast("String")).withColumn("dstIp", 'dstIp.cast("String")).withColumn("nextHopIp", 'nextHopIp.cast("String")).withColumn("bgpNextHopIp", 'bgpNextHopIp.cast("String")).withColumn("appId", 'appId.cast("String")).withColumn("policyQosClassificationHierarchy", 'policyQosClassificationHierarchy.cast("String")).withColumn("protocolId", 'protocolId.cast("Int")).withColumn("srcTos", 'srcTos.cast("Int")).withColumn("dstTos", 'dstTos.cast("Int")).withColumn("srcMask", 'srcMask.cast("Int")).withColumn("dstMask", 'dstMask.cast("Int")).withColumn("direction", 'direction.cast("String")).select('epoch, 'srcIp, 'dstIp, 'exporterIp, 'rowNum, 'exportMs, 'pktSeqNum, 'flowSeqNum, 'protocolId, 'srcTos, 'dstTos, 'srcMask, 'dstMask, 'tcpBits, 'srcPort, 'inIfId, 'inIfEntityId, 'inIfEnabled, 'dstPort, 'outIfId, 'outIfEntityId, 'outIfEnabled, 'direction, 'inOctets, 'outOctets, 'inPackets, 'outPackets, 'nextHopIp, 'bgpSrcAsNum, 'bgpDstAsNum, 'bgpNextHopIp, 'endMs, 'startMs, 'appId, 'appName, 'srcIpGroup, 'dstIpGroup, 'policyQosClassificationHierarchy, 'policyQosQueueId, 'workerId) parDFnew: org.apache.spark.sql.DataFrame = [epoch: bigint, srcIp: string ... 38 more fields]

parDFnew.write.format("filodb.spark"). option("dataset", "parDFNF"). option("row_keys", "appId"). option("partition_keys", "exportMs"). mode(SaveMode.Overwrite).save()

Logs

some log

or as attached file (see below)

Unused parts of this template should be removed (including this line).

shiwanshujalan commented 6 years ago

when we are using bulk_write mode as true we are getting null point exception while turning it off resolves it. for reference the rows_key column used is not unique but that's the requirement and we need to ingest all the records without overwriting them which happens when we set bulk_write as false.

velvia commented 6 years ago

Ah, ok. May I ask what is your overall application and use case? You could try using for example a timestamp as a unique row key. Spark also has an auto incrementing function, but the last time I tried it it didn't work.

On May 22 2018, at 2:58 am, shiwanshujalan notifications@github.com wrote:

when we are using bulk_write mode as true we are getting null point exception while turning it off resolves it. for reference the rows_key column used is not unique but that's the requirement and we need to ingest all the records without overwriting them which happens when we set bulk_write as false. — You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub (https://github.com/filodb/FiloDB/issues/152#issuecomment-390934262), or mute the thread (https://github.com/notifications/unsubscribe-auth/ABA327GlREY-_oSj7v0efzqArxaOVwH9ks5t0-FagaJpZM4TOM-d).

shiwanshujalan commented 6 years ago

Here is my use case.

We are trying to insert 18 million records in filoDB using parquet files. we have around 39 columns in records. Below is the schema definition for dataset. scala> parDF.printSchema root |-- epoch: long (nullable = true) |-- rowNum: long (nullable = true) |-- exportMs: long (nullable = true) |-- exporterIp: binary (nullable = true) |-- pktSeqNum: long (nullable = true) |-- flowSeqNum: integer (nullable = true) |-- srcIp: binary (nullable = true) |-- dstIp: binary (nullable = true) |-- protocolId: short (nullable = true) |-- srcTos: short (nullable = true) |-- dstTos: short (nullable = true) |-- srcMask: byte (nullable = true) |-- dstMask: byte (nullable = true) |-- tcpBits: integer (nullable = true) |-- srcPort: integer (nullable = true) |-- inIfId: long (nullable = true) |-- inIfEntityId: long (nullable = true) |-- inIfEnabled: boolean (nullable = true) |-- dstPort: integer (nullable = true) |-- outIfId: long (nullable = true) |-- outIfEntityId: long (nullable = true) |-- outIfEnabled: boolean (nullable = true) |-- direction: byte (nullable = true) |-- inOctets: long (nullable = true) |-- outOctets: long (nullable = true) |-- inPackets: long (nullable = true) |-- outPackets: long (nullable = true) |-- nextHopIp: binary (nullable = true) |-- bgpSrcAsNum: long (nullable = true) |-- bgpDstAsNum: long (nullable = true) |-- bgpNextHopIp: binary (nullable = true) |-- endMs: long (nullable = true) |-- startMs: long (nullable = true) |-- appId: binary (nullable = true) |-- appName: string (nullable = true) |-- srcIpGroup: string (nullable = true) |-- dstIpGroup: string (nullable = true) |-- policyQosClassificationHierarchy: binary (nullable = true) |-- policyQosQueueId: long (nullable = true) |-- workerId: integer (nullable = true)

we are converting the non supported datatypes using below command. scala> val dfResult = parDF.withColumn("exporterIp", inetToString(parDF("exporterIp"))).withColumn("srcIp", inetToString(parDF("srcIp"))).withColumn("dstIp", inetToString(parDF("dstIp"))).withColumn("nextHopIp", inetToString(parDF("nextHopIp"))).withColumn("bgpNextHopIp", inetToString(parDF("bgpNextHopIp"))).withColumn("appId", inetToString(parDF("appId"))).withColumn("policyQosClassificationHierarchy", inetToString(parDF("policyQosClassificationHierarchy"))).withColumn("protocolId", parDF("protocolId").cast(IntegerType)).withColumn("srcTos", parDF("srcTos").cast(IntegerType)).withColumn("dstTos", parDF("dstTos").cast(IntegerType)).withColumn("srcMask", parDF("srcMask").cast(StringType)).withColumn("dstMask", parDF("dstMask").cast(StringType)).withColumn("direction", parDF("direction").cast(StringType)).withColumn("inIfEnabled", parDF("inIfEnabled").cast(StringType)).withColumn("outIfEnabled", parDF("outIfEnabled").cast(StringType)) dfResult: org.apache.spark.sql.DataFrame = [epoch: bigint, rowNum: bigint ... 38 more fields]

after conversion the schema looks like below. scala> dfResult.printSchema root |-- epoch: long (nullable = true) |-- rowNum: long (nullable = true) |-- exportMs: long (nullable = true) |-- exporterIp: string (nullable = true) |-- pktSeqNum: long (nullable = true) |-- flowSeqNum: integer (nullable = true) |-- srcIp: string (nullable = true) |-- dstIp: string (nullable = true) |-- protocolId: integer (nullable = true) |-- srcTos: integer (nullable = true) |-- dstTos: integer (nullable = true) |-- srcMask: string (nullable = true) |-- dstMask: string (nullable = true) |-- tcpBits: integer (nullable = true) |-- srcPort: integer (nullable = true) |-- inIfId: long (nullable = true) |-- inIfEntityId: long (nullable = true) |-- inIfEnabled: string (nullable = true) |-- dstPort: integer (nullable = true) |-- outIfId: long (nullable = true) |-- outIfEntityId: long (nullable = true) |-- outIfEnabled: string (nullable = true) |-- direction: string (nullable = true) |-- inOctets: long (nullable = true) |-- outOctets: long (nullable = true) |-- inPackets: long (nullable = true) |-- outPackets: long (nullable = true) |-- nextHopIp: string (nullable = true) |-- bgpSrcAsNum: long (nullable = true) |-- bgpDstAsNum: long (nullable = true) |-- bgpNextHopIp: string (nullable = true) |-- endMs: long (nullable = true) |-- startMs: long (nullable = true) |-- appId: string (nullable = true) |-- appName: string (nullable = true) |-- srcIpGroup: string (nullable = true) |-- dstIpGroup: string (nullable = true) |-- policyQosClassificationHierarchy: string (nullable = true) |-- policyQosQueueId: long (nullable = true) |-- workerId: integer (nullable = true)

Here is the distribution for all the column values.

+-----+-------+--------+----------+---------+----------+--------+--------+----------+------+------+-------+-------+-------+-------+------+------------+-----------+-------+-------+-------------+------------+---------+--------+---------+---------+----------+---------+-----------+-----------+------------+------+-------+-----+-------+----------+----------+--------------------------------+----------------+--------+ |epoch| rowNum|exportMs|exporterIp|pktSeqNum|flowSeqNum| srcIp| dstIp|protocolId|srcTos|dstTos|srcMask|dstMask|tcpBits|srcPort|inIfId|inIfEntityId|inIfEnabled|dstPort|outIfId|outIfEntityId|outIfEnabled|direction|inOctets|outOctets|inPackets|outPackets|nextHopIp|bgpSrcAsNum|bgpDstAsNum|bgpNextHopIp| endMs|startMs|appId|appName|srcIpGroup|dstIpGroup|policyQosClassificationHierarchy|policyQosQueueId|workerId| +-----+-------+--------+----------+---------+----------+--------+--------+----------+------+------+-------+-------+-------+-------+------+------------+-----------+-------+-------+-------------+------------+---------+--------+---------+---------+----------+---------+-----------+-----------+------------+------+-------+-----+-------+----------+----------+--------------------------------+----------------+--------+ | 11|2400000| 481| 1| 400200| 30|15721052|15720630| 4| 1| 0| 1| 1| 1| 32767| 5| 5| 1| 7014| 5| 5| 1| 1| 1000000| 0| 9958| 0| 17962532| 1| 1| 0|521452| 625455| 7015| 24159| 0| 0| 0| 0| 1| +-----+-------+--------+----------+---------+----------+--------+--------+----------+------+------+-------+-------+-------+-------+------+------------+-----------+-------+-------+-------------+------------+---------+--------+---------+---------+----------+---------+-----------+-----------+------------+------+-------+-----+-------+----------+----------+--------------------------------+----------------+--------+

From above distribution you can see that we have only one column("nextHopIp") which has most number of unique keys (near to 18 million). So in our present use case we are using partition key ->protocolId(total distinct count = 4) and row_keys -> nextHopIp(total distinct count = 17962532) Below are the command which are executed to insert the data.

val dfResult1 = dfResult.sort($"protocolId") dfResult1.write.format("filodb.spark").option("dataset", "flow25").option("row_keys", "nextHopIp").option("partition_keys", "protocolId").mode(SaveMode.Append).save()

wirte command fails with NULLPOINTER exceptions.

Earlier we used below keys partition key -> protocolId(total distinct count = 4) and row_keys ->exportMs(total distinct count = 481) This combination didn't throw any error but since the distinct values are very less with row_key hence we cannot use this key as duplicate primary key data is overwritten because of bulk write set as false. We have also tried to set this value to true but it doesn't work as the total number of records which gets inserted in filoDB are very less (around 3k). PFB the write command used. df.write.format("filodb.spark"). option("dataset", "flow25"). option("row_keys", "workerId,inIfEntityId,outIfEntityId"). option("partition_keys", "workerId,exportMs"). option("filodb.reprojector.bulk-write-mode","true"). mode(SaveMode.Append).save()

velvia commented 6 years ago

@shiwanshujalan the fix is simple, I'll see if I can get something for you to try. However, just a warning that we are making major changes under the hood, so the version on master will not really be maintained.

shiwanshujalan commented 6 years ago

Hi, I will be much appreciated if we can get some timeline for the fix as this is a complete blocked for our POC. Currently we have around 3 POCs which are centred around FiloDB. Also can you tell us when the next version will be released.

Apart from the above mentioned issue i have somehow ingested 18 million records in FiloDB (using small chunks) but the space occupied is very high(around 1.5G) as compared to parquet file used(900 Mb). Do you have any suggestion on how to reduce the space in FiloDB.

Once this data is inserted we will be donig the adhoc aggregation on this data hence we would also appreciate if you could help us with the Data modelling.

Thanks.

velvia commented 6 years ago

Hi,

Timeline: most likely this weekend. Size: chunk size is heavily dependent on your data model. We can go into it a little bit. It can be optimized, but even at optimal sizes it is likely to be not quite as small as Parquet. How data is compressed in Cassandra also makes a big difference. Using the highest setting would help as well.

Next version: in the fall, but it will be very different focus and premise. There is a branch you can look at for a preview. I'm curious, how you found FiloDB and what you are comparing this against. On May 24 2018, at 6:38 am, shiwanshujalan notifications@github.com wrote:

Hi, I will be much appreciated if we can get some timeline for the fix as this is a complete blocked for our POC. Currently we have around 3 POCs which are centred around FiloDB. Also can you tell us when the next version will be released.

Apart from the above mentioned issue i have somehow ingested 18 million records in FiloDB (using small chunks) but the space occupied is very high(around 1.5G) as compared to parquet file used(900 Mb). Do you have any suggestion on how to reduce the space in FiloDB. Once this data is inserted we will be donig the adhoc aggregation on this data hence we would also appreciate if you could help us with the Data modelling. Thanks. — You are receiving this because you commented. Reply to this email directly, view it on GitHub (https://github.com/filodb/FiloDB/issues/152#issuecomment-391717828), or mute the thread (https://github.com/notifications/unsubscribe-auth/ABA329IZ5se8_7SgLfLsCfnjoTESPg37ks5t1rfTgaJpZM4TOM-d).

shiwanshujalan commented 6 years ago

Hi, Thanks for providing valuable suggestion for the data compression. I have changed the compression algo in the filodb-defaults.conf file and currently using DeflateCompression. Now the chunck size has been reduced around 800Mb from 1.5 Gb. Can you suggest if there is any further compression possible to get it reduced to somewhere around 500 Mb that would be great.

ALso does the fix which you are working will resolve bulk write issue?

Let me explain you the entire use case.

Currently we are ingesting around 18 million records in the form of parquet files and apart from that we are running around 32 aggregations on that data with the time interval of 1 mins , 15 mins, 30 mins ,1 hour and 6 hrs and 24 hrs. The issue which we are currently facing is data corruption because of the parquet file size. Hence we are looking for a solution where we can have the size compressed which will serve 2 purpose.

  1. Better storage
  2. Faster reading time. Hence we have Zeroed down to FIloDb.

So in typical use case we will be having continuous stream of data comming into the system (roughly around 18 million records/ min) and we will be ingesting that data into FiloDB. Once the data is inserted then we would be reading the data from FiloDB and carrying out aggregations on the time series data, we would be using exportMs as out range column. Expectations are to get all the 32 aggregations done withing a min so that user will have a data available through visualization with a mins granularity.

i am attaching the aggregations file for your reference.

vidyut_agg.txt

in the attached Aggragation file pipe seperated columns are the grouping keys on the data and we will be carrying out 4 aggregations i.e. sum, min, max, count on the entire raw data, all the 4 aggregations will be acted on 4 different columns hence in all for each record in the attached aggregation file we are applying 16 computations (4 computations on each column). Currently we have tried performing aggregations using SPARK while reading data from parquet file on 3 node cluster and getting a performance of around 75 secs. Each node is having 32 cpu and 64 Gb RAM.

Our prob is as the data grows in parquet files performance starts degrading. Hence we are looking for a faster retrieval time from DB where FIloDB comes into picture.

Few queries which i want to ask are

  1. Is thera any api which FiloDb supports where we can read the filtered data on the basis of time range from FiloDB. Currently from documentations i can see below command which loads the data from FIloDB into Dataframe. val df2=spark.read.format("filodb.spark").option("dataset","flow_protocol_cmprss1_nextHopIp_deflate").load() My question is this command loads the entire dataset into the dataframe and as the dataset size grows there will be performance degradation hence we are trying to find out a way where we only loads filtered data from FiloDB on the basis of time interval to avoid unwanted i/o.

  2. can you provide some suggestions on the data modelling side on the basis of attached aggregation sheet.

  3. I ingested around 10 million records in FiloDB in one go and it is taking around 4 mins to load the data. Is there any optimization possible to improve ingestion and what are the performance stats for data ingestion in FIloDB.

I am also attaching snapshot of our data for your reference. data.txt

Thanks.

velvia commented 6 years ago

@PallaviPersistent I just pushed a new version of filo-scala 0.3.7. So try modifying the dependency in build.sbt for filo-scala to 0.3.7 and see if that solves the problem - it should fix the NPE at least.

shiwanshujalan commented 6 years ago

Hi,

Thanks for providing fix for NPE in quick time. I have tested the fix against 18 million records, unfortunately it is still failing with NPE intermitently. PFB the error logs. scala> dfResult1.write.format("filodb.spark").option("dataset", "Netflow_protocol_3").option("row_keys", "exportMs,nextHopIp").option("partition_keys", "inIfId").option("filodb.reprojector.bulk-write-mode", "true").mode(SaveMode.Append).save() [INFO] [05/29/2018 18:10:33.288] [main] [StatsDExtension(akka://kamon)] Starting the Kamon(StatsD) extension 18/05/29 18:10:42 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf. [Stage 4:===========> (5 + 12) / 24]18/05/29 18:12:56 ERROR DatasetCoordinatorActor: Error in reprojection task (filodb.Netflow_protocol_3/0) java.lang.NullPointerException at org.velvia.filo.vectors.DictUTF8Vector$.shouldMakeDict(DictUTF8Vector.scala:46) at org.velvia.filo.vectors.UTF8PtrAppendable.optimizedVector(UTF8Vector.scala:314) at org.velvia.filo.vectors.UTF8PtrAppendable.suboptimize(UTF8Vector.scala:304) at org.velvia.filo.vectors.ObjectVector.optimize(ObjectVector.scala:59) at org.velvia.filo.BinaryVectorBuilder.toFiloBuffer(BinaryVector.scala:341) at org.velvia.filo.RowToVectorBuilder$$anonfun$2.apply(RowToVectorBuilder.scala:86) at org.velvia.filo.RowToVectorBuilder$$anonfun$2.apply(RowToVectorBuilder.scala:86) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.velvia.filo.RowToVectorBuilder.convertToBytes(RowToVectorBuilder.scala:86) at filodb.core.store.ChunkSet$.apply(ChunkSetInfo.scala:60) at filodb.core.store.ChunkSetSegment.addChunkSet(Segment.scala:194) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply$mcV$sp(Reprojector.scala:94) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93) at kamon.trace.TraceContext$class.withNewSegment(TraceContext.scala:53) at kamon.trace.MetricsOnlyContext.withNewSegment(MetricsOnlyContext.scala:28) at filodb.core.Perftools$.subtrace(Perftools.scala:26) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:92) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:79) at kamon.trace.Tracer$$anonfun$withNewContext$1.apply(TracerModule.scala:62) at kamon.trace.Tracer$.withContext(TracerModule.scala:53) at kamon.trace.Tracer$.withNewContext(TracerModule.scala:61) at kamon.trace.Tracer$.withNewContext(TracerModule.scala:77) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:79) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:78) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at filodb.core.reprojector.DefaultReprojector.toSegments(Reprojector.scala:78) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:118) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:117) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 18/05/29 18:12:56 ERROR OneForOneStrategy: java.lang.NullPointerException at org.velvia.filo.vectors.DictUTF8Vector$.shouldMakeDict(DictUTF8Vector.scala:46) at org.velvia.filo.vectors.UTF8PtrAppendable.optimizedVector(UTF8Vector.scala:314) at org.velvia.filo.vectors.UTF8PtrAppendable.suboptimize(UTF8Vector.scala:304) at org.velvia.filo.vectors.ObjectVector.optimize(ObjectVector.scala:59) at org.velvia.filo.BinaryVectorBuilder.toFiloBuffer(BinaryVector.scala:341) at org.velvia.filo.RowToVectorBuilder$$anonfun$2.apply(RowToVectorBuilder.scala:86) at org.velvia.filo.RowToVectorBuilder$$anonfun$2.apply(RowToVectorBuilder.scala:86) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.velvia.filo.RowToVectorBuilder.convertToBytes(RowToVectorBuilder.scala:86) at filodb.core.store.ChunkSet$.apply(ChunkSetInfo.scala:60) at filodb.core.store.ChunkSetSegment.addChunkSet(Segment.scala:194) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply$mcV$sp(Reprojector.scala:94) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93) at kamon.trace.TraceContext$class.withNewSegment(TraceContext.scala:53) at kamon.trace.MetricsOnlyContext.withNewSegment(MetricsOnlyContext.scala:28) at filodb.core.Perftools$.subtrace(Perftools.scala:26) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:92) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:79) at kamon.trace.Tracer$$anonfun$withNewContext$1.apply(TracerModule.scala:62) at kamon.trace.Tracer$.withContext(TracerModule.scala:53) at kamon.trace.Tracer$.withNewContext(TracerModule.scala:61) at kamon.trace.Tracer$.withNewContext(TracerModule.scala:77) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:79) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:78) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at filodb.core.reprojector.DefaultReprojector.toSegments(Reprojector.scala:78) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:118) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:117) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 18/05/29 18:12:56 WARN NodeCoordinatorActor: Actor Actor[akka://filo-spark/user/coordinator/ds-coord-Netflow_protocol_3-0#432767698] has terminated! Ingestion for (filodb.Netflow_protocol_3,0) will stop. 18/05/29 18:12:56 ERROR Executor: Exception in task 3.0 in stage 4.0 (TID 1034) java.lang.RuntimeException: Ingestion actors shut down from ref Actor[akka://filo-spark/user/coordinator#48637466], check error logs at filodb.spark.package$.ingestRddRows(package.scala:105) at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:167) at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:162) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 18/05/29 18:12:56 WARN TaskSetManager: Lost task 3.0 in stage 4.0 (TID 1034, localhost): java.lang.RuntimeException: Ingestion actors shut down from ref Actor[akka://filo-spark/user/coordinator#48637466], check error logs at filodb.spark.package$.ingestRddRows(package.scala:105) at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:167) at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:162) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)

18/05/29 18:12:56 ERROR TaskSetManager: Task 3 in stage 4.0 failed 1 times; aborting job 18/05/29 18:12:56 WARN TaskSetManager: Lost task 17.0 in stage 4.0 (TID 1048, localhost): TaskKilled (killed intentionally) org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 4.0 failed 1 times, most recent failure: Lost task 3.0 in stage 4.0 (TID 1034, localhost): java.lang.RuntimeException: Ingestion actors shut down from ref Actor[akka://filo-spark/user/coordinator#48637466], check error logs at filodb.spark.package$.ingestRddRows(package.scala:105) at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:167) at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:162) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911) at org.apache.spark.rdd.RDD.count(RDD.scala:1115) at filodb.spark.FiloContext$.insertIntoFilo$extension(FiloContext.scala:170) at filodb.spark.FiloContext$.saveAsFilo$extension(FiloContext.scala:119) at filodb.spark.DefaultSource.createRelation(DefaultSource.scala:63) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:429) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211) ... 48 elided Caused by: java.lang.RuntimeException: Ingestion actors shut down from ref Actor[akka://filo-spark/user/coordinator#48637466], check error logs at filodb.spark.package$.ingestRddRows(package.scala:105) at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:167) at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:162) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)

Also when i take a look at memory consumption it is taking lot of memory for computation, is it normal behavior with FiloDB?

I tried to use the bulk write option to check whether it appends the duplicate records in FiloDB but it doesn't seems to be working for us. Can you also provide a solution for this as it will be very useful for us.

Also currently its taking around 7 mins to ingest 18 million records into FiloDB, can you provide some suggestions around this as our expectation are around a min for ingestion.

Appreciate your prompt help on this.

Thanks.

velvia commented 6 years ago

Hi, you are sure the error trace is from the new version of filo-scala right? I'm not quite sure how an NPE is possible from that line number.

Bulk write option - this might result in duplicates yes. What do you mean "it is not working", are you referring to the NPE? 7 min and memory - it's probably not good idea to measure performance when there are NPEs. When there is an error the whole ingestion stack gets restarted and Spark tries to restart the task and reingest multiple times. Memory usage - which stat are you measuring? There is a static amount of memory always used because of the write buffers, and dynamic garbage which a lot probably gets generated since in order to do columnar writes we need to buffer enough data to turn into chunks. Data model - I'll have more of a look later but you essentially have at least two different dimensions. If time is the primary dimension you care about you could put time into the partition key which would reduce the partitons you need to read, and if you need to select time within a partition you could put time as part of the row key as well which would result in being able to range select by time within a partition. Note that the selection happens whenyou issue the SQL statement or equivalently the df.where(....) condition, if that condition is on the partition and/or row keys, then those get pushed down to the FiloDB datasource, so that not all rows are read. This is key to read performance. On May 29 2018, at 5:58 am, shiwanshujalan notifications@github.com wrote:

Hi, Thanks for providing fix for NPE in quick time. I have tested the fix against 18 million records, unfortunately it is still failing with NPE intermitently. PFB the error logs. scala> dfResult1.write.format("filodb.spark").option("dataset", "Netflow_protocol_3").option("row_keys", "exportMs,nextHopIp").option("partition_keys", "inIfId").option("filodb.reprojector.bulk-write-mode", "true").mode(SaveMode.Append).save() [INFO] [05/29/2018 18:10:33.288] [main] [StatsDExtension(akka://kamon)] Starting the Kamon(StatsD) extension 18/05/29 18:10:42 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf. [Stage 4:===========> (5 + 12) / 24]18/05/29 18:12:56 ERROR DatasetCoordinatorActor: Error in reprojection task (filodb.Netflow_protocol_3/0) java.lang.NullPointerException at org.velvia.filo.vectors.DictUTF8Vector$.shouldMakeDict(DictUTF8Vector.scala:46) at org.velvia.filo.vectors.UTF8PtrAppendable.optimizedVector(UTF8Vector.scala:314) at org.velvia.filo.vectors.UTF8PtrAppendable.suboptimize(UTF8Vector.scala:304) at org.velvia.filo.vectors.ObjectVector.optimize(ObjectVector.scala:59) at org.velvia.filo.BinaryVectorBuilder.toFiloBuffer(BinaryVector.scala:341) at org.velvia.filo.RowToVectorBuilder$$anonfun$2.apply(RowToVectorBuilder.scala:86) at org.velvia.filo.RowToVectorBuilder$$anonfun$2.apply(RowToVectorBuilder.scala:86) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.velvia.filo.RowToVectorBuilder.convertToBytes(RowToVectorBuilder.scala:86) at filodb.core.store.ChunkSet$.apply(ChunkSetInfo.scala:60) at filodb.core.store.ChunkSetSegment.addChunkSet(Segment.scala:194) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply$mcV$sp(Reprojector.scala:94) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93) at kamon.trace.TraceContext$class.withNewSegment(TraceContext.scala:53) at kamon.trace.MetricsOnlyContext.withNewSegment(MetricsOnlyContext.scala:28) at filodb.core.Perftools$.subtrace(Perftools.scala:26) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:92) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:79) at kamon.trace.Tracer$$anonfun$withNewContext$1.apply(TracerModule.scala:62) at kamon.trace.Tracer$.withContext(TracerModule.scala:53) at kamon.trace.Tracer$.withNewContext(TracerModule.scala:61) at kamon.trace.Tracer$.withNewContext(TracerModule.scala:77) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:79) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:78) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at filodb.core.reprojector.DefaultReprojector.toSegments(Reprojector.scala:78) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:118) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:117) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 18/05/29 18:12:56 ERROR OneForOneStrategy: java.lang.NullPointerException at org.velvia.filo.vectors.DictUTF8Vector$.shouldMakeDict(DictUTF8Vector.scala:46) at org.velvia.filo.vectors.UTF8PtrAppendable.optimizedVector(UTF8Vector.scala:314) at org.velvia.filo.vectors.UTF8PtrAppendable.suboptimize(UTF8Vector.scala:304) at org.velvia.filo.vectors.ObjectVector.optimize(ObjectVector.scala:59) at org.velvia.filo.BinaryVectorBuilder.toFiloBuffer(BinaryVector.scala:341) at org.velvia.filo.RowToVectorBuilder$$anonfun$2.apply(RowToVectorBuilder.scala:86) at org.velvia.filo.RowToVectorBuilder$$anonfun$2.apply(RowToVectorBuilder.scala:86) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.velvia.filo.RowToVectorBuilder.convertToBytes(RowToVectorBuilder.scala:86) at filodb.core.store.ChunkSet$.apply(ChunkSetInfo.scala:60) at filodb.core.store.ChunkSetSegment.addChunkSet(Segment.scala:194) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply$mcV$sp(Reprojector.scala:94) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93) at kamon.trace.TraceContext$class.withNewSegment(TraceContext.scala:53) at kamon.trace.MetricsOnlyContext.withNewSegment(MetricsOnlyContext.scala:28) at filodb.core.Perftools$.subtrace(Perftools.scala:26) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:92) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:79) at kamon.trace.Tracer$$anonfun$withNewContext$1.apply(TracerModule.scala:62) at kamon.trace.Tracer$.withContext(TracerModule.scala:53) at kamon.trace.Tracer$.withNewContext(TracerModule.scala:61) at kamon.trace.Tracer$.withNewContext(TracerModule.scala:77) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:79) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:78) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at filodb.core.reprojector.DefaultReprojector.toSegments(Reprojector.scala:78) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:118) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:117) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 18/05/29 18:12:56 WARN NodeCoordinatorActor: Actor Actor[akka://filo-spark/user/coordinator/ds-coord-Netflow_protocol_3-0#432767698] has terminated! Ingestion for (filodb.Netflow_protocol_3,0) will stop. 18/05/29 18:12:56 ERROR Executor: Exception in task 3.0 in stage 4.0 (TID 1034) java.lang.RuntimeException: Ingestion actors shut down from ref Actor[akka://filo-spark/user/coordinator#48637466], check error logs at filodb.spark.package$.ingestRddRows(package.scala:105) at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:167) at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:162) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 18/05/29 18:12:56 WARN TaskSetManager: Lost task 3.0 in stage 4.0 (TID 1034, localhost): java.lang.RuntimeException: Ingestion actors shut down from ref Actor[akka://filo-spark/user/coordinator#48637466], check error logs at filodb.spark.package$.ingestRddRows(package.scala:105) at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:167) at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:162) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)

18/05/29 18:12:56 ERROR TaskSetManager: Task 3 in stage 4.0 failed 1 times; aborting job 18/05/29 18:12:56 WARN TaskSetManager: Lost task 17.0 in stage 4.0 (TID 1048, localhost): TaskKilled (killed intentionally) org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 4.0 failed 1 times, most recent failure: Lost task 3.0 in stage 4.0 (TID 1034, localhost): java.lang.RuntimeException: Ingestion actors shut down from ref Actor[akka://filo-spark/user/coordinator#48637466], check error logs at filodb.spark.package$.ingestRddRows(package.scala:105) at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:167) at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:162) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911) at org.apache.spark.rdd.RDD.count(RDD.scala:1115) at filodb.spark.FiloContext$.insertIntoFilo$extension(FiloContext.scala:170) at filodb.spark.FiloContext$.saveAsFilo$extension(FiloContext.scala:119) at filodb.spark.DefaultSource.createRelation(DefaultSource.scala:63) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:429) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211) ... 48 elided Caused by: java.lang.RuntimeException: Ingestion actors shut down from ref Actor[akka://filo-spark/user/coordinator#48637466], check error logs at filodb.spark.package$.ingestRddRows(package.scala:105) at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:167) at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:162) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)

Also when i take a look at memory consumption it is taking lot of memory for computation, is it normal behavior with FiloDB? I tried to use the bulk write option to check whether it appends the duplicate records in FiloDB but it doesn't seems to be working for us. Can you also provide a solution for this as it will be very useful for us.

Also currently its taking around 7 mins to ingest 18 million records into FiloDB, can you provide some suggestions around this as our expectation are around a min for ingestion. Appreciate your prompt help on this. Thanks. — You are receiving this because you commented. Reply to this email directly, view it on GitHub (https://github.com/filodb/FiloDB/issues/152#issuecomment-392766268), or mute the thread (https://github.com/notifications/unsubscribe-auth/ABA32_hFcZ2xSxfG1naNj3S-BW5uInGNks5t3UXZgaJpZM4TOM-d).

shiwanshujalan commented 6 years ago

Hi, Yes i have replaced the filo-scala version in build.sbt file and changed it to 0.3.7. then i have recreated the build using below commands sbt clean ./fil0-cli --command init sbt spark/assembly. Also what we are currently observing is that whenever we are ingesting around 2.4 million records/min continuously in filodb we are getting null pointer exception in almost every third run.

For Bulk write option when we are ingesting duplicate data it is still overwriting the existing primary key records hence this option in not working in our case.

The time taken to ingest 18 million is comming around 7 mins, this time is for the round when there were no NPE and entire 18 million records got ingested in one shot.

On Memory front what we are observing is when we are ingesting data thorugh spark shell the first time 18 million records gets ingested but on the same shell if we again try to ingest another 18 million records(all records are duplicate) , it gives us JAVA RUNTIME MEMORY errors . When we exit the spark shell and enter it again and then try to load the data it works fine.

As part of out POC we will be ingesting around 2.4 million reccords/min in FIloDB for continuos 2 days but since we are getting NPE we are stuck with Ingestion.

Can you suggest on the above points and also look at the NPE.

shiwanshujalan commented 6 years ago

Hi, I have rebuild the Filo jar using 0.3.7 version in build.sbt.

PFB the snapshot of the jar imported during initialization.

nfo] Including: logback-core-1.0.7.jar [info] Including: jnr-ffi-2.1.6.jar [info] Including: jffi-1.2.16-native.jar [info] Including: filo-scala_2.11-0.3.7.jar [info] Including: boon-0.33.jar [info] Including: akka-actor_2.11-2.3.15.jar

As you can see the lates filo-scala jar is created.

But i am getting NPE during ingestion of 18 million records.

I am attaching the entire run logs for your reference.

filodb_run.txt

From it i can confirm that the fix is not working as expected. Could you please check if somthing is missing from my end from the logs.

Also can we connect during your working hours so that i can showcase the entire run and if something needs to tried and tested we can do it right away as it will really expedite our work.

Thanks.

velvia commented 6 years ago

Hmmm. I'm sorry to hear the fix did not work.

Connecting during working hours is probably not easy. Which time zone are you located in? I think the easiest is if there is any way to get some anonymized, smallest subset of data that can reproduce the error. However, let me be honest - the core problem is that we are no longer working on the version that is in master, we are focused on a newer version with very different internals. Thus what I would do is try to apply the data to our newer version and see what works and breaks. On Jun 4 2018, at 1:32 am, shiwanshujalan notifications@github.com wrote:

Hi, I have rebuild the Filo jar using 0.3.7 version in build.sbt.

PFB the snapshot of the jar imported during initialization. nfo] Including: logback-core-1.0.7.jar [info] Including: jnr-ffi-2.1.6.jar [info] Including: jffi-1.2.16-native.jar [info] Including: filo-scala_2.11-0.3.7.jar [info] Including: boon-0.33.jar [info] Including: akka-actor_2.11-2.3.15.jar

As you can see the lates filo-scala jar is created. But i am getting NPE during ingestion of 18 million records. I am attaching the entire run logs for your reference. filodb_run.txt (https://github.com/filodb/FiloDB/files/2067408/filodb_run.txt) From it i can confirm that the fix is not working as expected. Could you please check if somthing is missing from my end from the logs. Also can we connect during your working hours so that i can showcase the entire run and if something needs to tried and tested we can do it right away as it will really expedite our work. Thanks. — You are receiving this because you commented. Reply to this email directly, view it on GitHub (https://github.com/filodb/FiloDB/issues/152#issuecomment-394275970), or mute the thread (https://github.com/notifications/unsubscribe-auth/ABA32w7ErhbgKtRxKGAD_V5ytNPf3Kyxks5t5PCQgaJpZM4TOM-d).

PallaviPersistent commented 6 years ago

We can try doing that by applying filter on the records and writing the same but the issue is intermittent. Is the new version available on git so that we can use it for our testing. We work in IST hours.

shiwanshujalan commented 6 years ago

Hi,

We are in india Time Zone. It will be really helpful if we could connect for one session, We will be able to present the entire scenario since this issue is intermittent and we cannot provide the entire data set.

Thanks.

velvia commented 6 years ago

We could potentially connect at around 9:45am or 10am PST tomorrow (Wed June 6th)... would that work?

I'm honestly not sure how helpful I can be. On Jun 4 2018, at 11:35 pm, shiwanshujalan notifications@github.com wrote:

Hi, We are in india Time Zone. It will be really helpful if we could connect for one session, We will be able to present the entire scenario since this issue is intermittent and we cannot provide the entire data set. Thanks. — You are receiving this because you commented. Reply to this email directly, view it on GitHub (https://github.com/filodb/FiloDB/issues/152#issuecomment-394597577), or mute the thread (https://github.com/notifications/unsubscribe-auth/ABA325KeH0F-_6BttafQq3pCp3UjBln0ks5t5iaggaJpZM4TOM-d).

shiwanshujalan commented 6 years ago

9:45 am pst should be fine with us..Thanks for providing time..shall i send you webex link for tomorrow meet?

velvia commented 6 years ago

Sure thing, go ahead and send a WebEx.

On Jun 5 2018, at 11:44 am, shiwanshujalan notifications@github.com wrote:

9:45 am pst should be fine with us..Thanks for providing time..shall i send you webex link for tomorrow meet? — You are receiving this because you commented. Reply to this email directly, view it on GitHub (https://github.com/filodb/FiloDB/issues/152#issuecomment-394817976), or mute the thread (https://github.com/notifications/unsubscribe-auth/ABA32y7nFKy9_rz52pmFDhJ5WbihK4wpks5t5tF6gaJpZM4TOM-d).

shiwanshujalan commented 6 years ago

Hi, PFB the meeting invite. https://persistent.webex.com/persistent/j.php?MTID=m62e37f692b46aa3c4d713281507ec0fa Password: Y8iHfTZP I have scheduled the meeting for 9:30 PM IST.

Let me know in case you have any issues while joining meeting. Thanks.

velvia commented 6 years ago

Hi I'm not quite sure if 9:30pm IST is same as 9:45am PST, hope it is :)

On Jun 5 2018, at 10:48 pm, shiwanshujalan notifications@github.com wrote:

Hi, PFB the meeting invite. https://persistent.webex.com/persistent/j.php?MTID=m62e37f692b46aa3c4d713281507ec0fa Password: Y8iHfTZP I have scheduled the meeting for 9:30 PM IST.

Let me know in case you have any issues while joining meeting. Thanks.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub (https://github.com/filodb/FiloDB/issues/152#issuecomment-394947710), or mute the thread (https://github.com/notifications/unsubscribe-auth/ABA323ekbGZqUaPNl5A6u_SdVP7yUTTBks5t520TgaJpZM4TOM-d).

shiwanshujalan commented 6 years ago

Hi, Our 9:30 PM IST is 10 AM PST.

shiwanshujalan commented 6 years ago

Hi,

We are on the WEBex. Let me know in case you are facing any trouble in joining WEBex.

velvia commented 6 years ago

I am trying to join. Wifi here is very slow.

On Jun 6 2018, at 9:20 am, shiwanshujalan notifications@github.com wrote:

Hi, We are on the WEBex. Let me know in case you are facing any trouble in joining WEBex. — You are receiving this because you commented. Reply to this email directly, view it on GitHub (https://github.com/filodb/FiloDB/issues/152#issuecomment-395128770), or mute the thread (https://github.com/notifications/unsubscribe-auth/ABA329bqsJT-7nj4RiERmTtZMfN553FZks5t6AFrgaJpZM4TOM-d).

PallaviPersistent commented 6 years ago

Hi Evan,

Thanks for your time on the last webex call. As suggested, we tried to perform the ingestion by doing the below -> Removing Columns having NULL values We still are getting NPE. -> We tried removing string columns and then writing the data and we could write 100 million + rows successfully.

So we suspect the issue to be with data type String.

We have few columns which are originally String and some which are originally Binary, Byte and have been converted to String. We tried ingesting columns originally String and received NPE. Snapshot of the column values ipv6:67 ipv6:118 ipv6:4703 ipv6:80 ipv6:443 ipv6:21 ipv6:43 ipv6:80 ipv6:1039 ipv6:80 ipv6:3125 ipv6:53 ipv6:5804 ipv6:22 ipv6:22 ipv6:21 ipv6:5884 ipv6:5922 ipv6:1308 ipv6:443 ipv6:68 ipv6:194 ipv6:118 ipv6:118 ipv6:53 ipv6:20 ipv6:67 ipv6:194 ipv6:445 ipv6:443 ipv6:43 ipv6:119 ipv6:118 ipv6:20 ipv6:21 ipv6:4560 ipv6:53 ipv6:80

Also we tried to ingest columns converted to String and it again resulted in NPE. Snapshot of the column values 53.218.50.27 96.35.2.107 186.127.105.182 141.195.32.146 183.85.199.206 84.90.252.182 219.123.171.140 194.65.227.130 115.233.179.117 128.50.50.250

Thanks, Pallavi

velvia commented 6 years ago

Hmm, weird. I'll try to run a unit test with these exact values and see what happens.

On Jun 10 2018, at 11:54 pm, PallaviPersistent notifications@github.com wrote:

Hi Evan, Thanks for your time on the last webex call. As suggested, we tried to perform the ingestion by doing the below -> Removing Columns having NULL values We still are getting NPE. -> We tried removing string columns and then writing the data and we could write 100 million + rows successfully.

So we suspect the issue to be with data type String. We have few columns which are originally String and some which are originally Binary, Byte and have been converted to String. We tried ingesting columns originally String and received NPE. Snapshot of the column values ipv6:67 ipv6:118 ipv6:4703 ipv6:80 ipv6:443 ipv6:21 ipv6:43 ipv6:80 ipv6:1039 ipv6:80 ipv6:3125 ipv6:53 ipv6:5804 ipv6:22 ipv6:22 ipv6:21 ipv6:5884 ipv6:5922 ipv6:1308 ipv6:443 ipv6:68 ipv6:194 ipv6:118 ipv6:118 ipv6:53 ipv6:20 ipv6:67 ipv6:194 ipv6:445 ipv6:443 ipv6:43 ipv6:119 ipv6:118 ipv6:20 ipv6:21 ipv6:4560 ipv6:53 ipv6:80

Also we tried to ingest columns converted to String and it again resulted in NPE. Snapshot of the column values 53.218.50.27 96.35.2.107 186.127.105.182 141.195.32.146 183.85.199.206 84.90.252.182 219.123.171.140 194.65.227.130 115.233.179.117 128.50.50.250

Thanks, Pallavi

— You are receiving this because you commented. Reply to this email directly, view it on GitHub (https://github.com/filodb/FiloDB/issues/152#issuecomment-396140406), or mute the thread (https://github.com/notifications/unsubscribe-auth/ABA321akTPl0X9zoNEhcSZMb5kdEdtU5ks5t7hQSgaJpZM4TOM-d).

PallaviPersistent commented 6 years ago

Hi Evan,

Any luck with the unit test on the values we provided.

Regards, Pallavi

velvia commented 6 years ago

Hi, I added a test just now and it passed on the ipv6 data. :( I have someone else who uses an older version in production and has seem similar issues. They say they see no issues with smaller or larger amounts of data when only one partition/thread is ingesting, but issues with multiple ones, and also only issues with larger amounts of data. Are you seeing the same thing?

I am in the middle of rewriting the storage format. It might help when the results are done. On Jul 11 2018, at 1:27 am, PallaviPersistent notifications@github.com wrote:

Hi Evan, Any luck with the unit test on the values we provided. Regards, Pallavi

— You are receiving this because you commented. Reply to this email directly, view it on GitHub (https://github.com/filodb/FiloDB/issues/152#issuecomment-404087595), or mute the thread (https://github.com/notifications/unsubscribe-auth/ABA32xn0AyPmiP0jNOhrwlj-0P4pBAloks5uFbbigaJpZM4TOM-d).

velvia commented 6 years ago

Would you mind sending the NPE again one more time, the entire stack trace? Thanks.

On Jul 12 2018, at 8:21 am, Evan Chan velvia@gmail.com wrote:

Hi, I added a test just now and it passed on the ipv6 data. :( I have someone else who uses an older version in production and has seem similar issues. They say they see no issues with smaller or larger amounts of data when only one partition/thread is ingesting, but issues with multiple ones, and also only issues with larger amounts of data. Are you seeing the same thing?

I am in the middle of rewriting the storage format. It might help when the results are done. On Jul 11 2018, at 1:27 am, PallaviPersistent notifications@github.com wrote:

Hi Evan, Any luck with the unit test on the values we provided. Regards, Pallavi

— You are receiving this because you commented. Reply to this email directly, view it on GitHub (https://github.com/filodb/FiloDB/issues/152#issuecomment-404087595), or mute the thread (https://github.com/notifications/unsubscribe-auth/ABA32xn0AyPmiP0jNOhrwlj-0P4pBAloks5uFbbigaJpZM4TOM-d).

PallaviPersistent commented 6 years ago

Hi Evan,

Yes the issue mostly appears when we try to insert large amounts of data. Also we see this error if we try inserting small amount of data in the same dataset.

PFB the stack trace for NPE

[INFO] [05/29/2018 18:10:33.288] [main] [StatsDExtension(akka://kamon)] Starting the Kamon(StatsD) extension 18/05/29 18:10:42 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf. [Stage 4:===========> (5 + 12) / 24]18/05/29 18:12:56 ERROR DatasetCoordinatorActor: Error in reprojection task (filodb.Netflow_protocol_3/0) java.lang.NullPointerException at org.velvia.filo.vectors.DictUTF8Vector$.shouldMakeDict(DictUTF8Vector.scala:46) at org.velvia.filo.vectors.UTF8PtrAppendable.optimizedVector(UTF8Vector.scala:314) at org.velvia.filo.vectors.UTF8PtrAppendable.suboptimize(UTF8Vector.scala:304) at org.velvia.filo.vectors.ObjectVector.optimize(ObjectVector.scala:59) at org.velvia.filo.BinaryVectorBuilder.toFiloBuffer(BinaryVector.scala:341) at org.velvia.filo.RowToVectorBuilder$$anonfun$2.apply(RowToVectorBuilder.scala:86) at org.velvia.filo.RowToVectorBuilder$$anonfun$2.apply(RowToVectorBuilder.scala:86) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.velvia.filo.RowToVectorBuilder.convertToBytes(RowToVectorBuilder.scala:86) at filodb.core.store.ChunkSet$.apply(ChunkSetInfo.scala:60) at filodb.core.store.ChunkSetSegment.addChunkSet(Segment.scala:194) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply$mcV$sp(Reprojector.scala:94) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93) at kamon.trace.TraceContext$class.withNewSegment(TraceContext.scala:53) at kamon.trace.MetricsOnlyContext.withNewSegment(MetricsOnlyContext.scala:28) at filodb.core.Perftools$.subtrace(Perftools.scala:26) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:92) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:79) at kamon.trace.Tracer$$anonfun$withNewContext$1.apply(TracerModule.scala:62) at kamon.trace.Tracer$.withContext(TracerModule.scala:53) at kamon.trace.Tracer$.withNewContext(TracerModule.scala:61) at kamon.trace.Tracer$.withNewContext(TracerModule.scala:77) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:79) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:78) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at filodb.core.reprojector.DefaultReprojector.toSegments(Reprojector.scala:78) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:118) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:117) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 18/05/29 18:12:56 ERROR OneForOneStrategy: java.lang.NullPointerException at org.velvia.filo.vectors.DictUTF8Vector$.shouldMakeDict(DictUTF8Vector.scala:46) at org.velvia.filo.vectors.UTF8PtrAppendable.optimizedVector(UTF8Vector.scala:314) at org.velvia.filo.vectors.UTF8PtrAppendable.suboptimize(UTF8Vector.scala:304) at org.velvia.filo.vectors.ObjectVector.optimize(ObjectVector.scala:59) at org.velvia.filo.BinaryVectorBuilder.toFiloBuffer(BinaryVector.scala:341) at org.velvia.filo.RowToVectorBuilder$$anonfun$2.apply(RowToVectorBuilder.scala:86) at org.velvia.filo.RowToVectorBuilder$$anonfun$2.apply(RowToVectorBuilder.scala:86) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.velvia.filo.RowToVectorBuilder.convertToBytes(RowToVectorBuilder.scala:86) at filodb.core.store.ChunkSet$.apply(ChunkSetInfo.scala:60) at filodb.core.store.ChunkSetSegment.addChunkSet(Segment.scala:194) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply$mcV$sp(Reprojector.scala:94) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93) at kamon.trace.TraceContext$class.withNewSegment(TraceContext.scala:53) at kamon.trace.MetricsOnlyContext.withNewSegment(MetricsOnlyContext.scala:28) at filodb.core.Perftools$.subtrace(Perftools.scala:26) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:92) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:79) at kamon.trace.Tracer$$anonfun$withNewContext$1.apply(TracerModule.scala:62) at kamon.trace.Tracer$.withContext(TracerModule.scala:53) at kamon.trace.Tracer$.withNewContext(TracerModule.scala:61) at kamon.trace.Tracer$.withNewContext(TracerModule.scala:77) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:79) at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:78) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at filodb.core.reprojector.DefaultReprojector.toSegments(Reprojector.scala:78) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:118) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:117) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117) at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 18/05/29 18:12:56 WARN NodeCoordinatorActor: Actor Actor[akka://filo-spark/user/coordinator/ds-coord-Netflow_protocol_3-0#432767698] has terminated! Ingestion for (filodb.Netflow_protocol_3,0) will stop. 18/05/29 18:12:56 ERROR Executor: Exception in task 3.0 in stage 4.0 (TID 1034) java.lang.RuntimeException: Ingestion actors shut down from ref Actor[akka://filo-spark/user/coordinator#48637466], check error logs at filodb.spark.package$.ingestRddRows(package.scala:105) at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:167) at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:162) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 18/05/29 18:12:56 WARN TaskSetManager: Lost task 3.0 in stage 4.0 (TID 1034, localhost): java.lang.RuntimeException: Ingestion actors shut down from ref Actor[akka://filo-spark/user/coordinator#48637466], check error logs at filodb.spark.package$.ingestRddRows(package.scala:105) at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:167) at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:162) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)

18/05/29 18:12:56 ERROR TaskSetManager: Task 3 in stage 4.0 failed 1 times; aborting job 18/05/29 18:12:56 WARN TaskSetManager: Lost task 17.0 in stage 4.0 (TID 1048, localhost): TaskKilled (killed intentionally) org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 4.0 failed 1 times, most recent failure: Lost task 3.0 in stage 4.0 (TID 1034, localhost): java.lang.RuntimeException: Ingestion actors shut down from ref Actor[akka://filo-spark/user/coordinator#48637466], check error logs at filodb.spark.package$.ingestRddRows(package.scala:105) at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:167) at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:162) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911) at org.apache.spark.rdd.RDD.count(RDD.scala:1115) at filodb.spark.FiloContext$.insertIntoFilo$extension(FiloContext.scala:170) at filodb.spark.FiloContext$.saveAsFilo$extension(FiloContext.scala:119) at filodb.spark.DefaultSource.createRelation(DefaultSource.scala:63) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:429) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211) ... 48 elided Caused by: java.lang.RuntimeException: Ingestion actors shut down from ref Actor[akka://filo-spark/user/coordinator#48637466], check error logs at filodb.spark.package$.ingestRddRows(package.scala:105) at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:167) at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:162) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)