sasha-polev / aerospark

Aerospike Spark Connector
Apache License 2.0
35 stars 38 forks source link

select count(distinct column2) from aero where intColumn1 > -1000000 and intColumn1 < 100000 throws LUA exception #3

Closed andrewmilkowski closed 8 years ago

andrewmilkowski commented 9 years ago

Hi, using latest git master and spark 1.3.1, received following error below

thanks in advance for the assistance

[amilkowski@localhost aerospark]$ which spark-shell /opt/local/src/spark/spark-1.3.1/bin/spark-shell [amilkowski@localhost aerospark]$

[amilkowski@localhost aerospark]$ asinfo

6 : edition Aerospike Community Edition 7 : version Aerospike Community Edition build 3.5.14 8 : build 3.5.14 9 : services

executed below

CREATE TEMPORARY TABLE aero USING com.osscube.spark.aerospike.rdd OPTIONS (initialHost "127.0.0.1:3000", select "select column1,column2,intColumn1 from test.one_million where intColumn1 between -1000 and 1000000", partitionsPerServer "2");

select * from aero;

select count(distinct column2) from aero where intColumn1 > -1000000 and intColumn1 < 100000;

     > select count(distinct column2) from aero where intColumn1  > -1000000 and intColumn1 < 100000;

UDF Filters applied: [column2, intColumn1],[3, intColumn1, -1000000, 100000],[3, intColumn1, -1000000, 100000] useUDF: true UDF params: (3,intColumn1,,List((-1000000,100000)))-(3,intColumn1,,List((-1000000,100000))) 15/07/30 16:31:04 ERROR Executor: Exception in task 0.0 in stage 34.0 (TID 29) java.lang.Exception: (gen:0),(exp:0),(bins:(FAILURE:UDF: Execution Error 2 : /opt/aerospike/usr/udf/lua/spark_filters.lua:32: bad argument #1 to 'ipairs' (table expected, got nil))) at com.osscube.spark.aerospike.rdd.AerospikeRDD$$anonfun$compute$4.apply(AerospikeRDD.scala:111) at com.osscube.spark.aerospike.rdd.AerospikeRDD$$anonfun$compute$4.apply(AerospikeRDD.scala:92) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:129) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:126) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/07/30 16:31:04 WARN TaskSetManager: Lost task 0.0 in stage 34.0 (TID 29, localhost): java.lang.Exception: (gen:0),(exp:0),(bins:(FAILURE:UDF: Execution Error 2 : /opt/aerospike/usr/udf/lua/spark_filters.lua:32: bad argument #1 to 'ipairs' (table expected, got nil))) at com.osscube.spark.aerospike.rdd.AerospikeRDD$$anonfun$compute$4.apply(AerospikeRDD.scala:111) at com.osscube.spark.aerospike.rdd.AerospikeRDD$$anonfun$compute$4.apply(AerospikeRDD.scala:92) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:129) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:126) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)

15/07/30 16:31:04 ERROR TaskSetManager: Task 0 in stage 34.0 failed 1 times; aborting job 15/07/30 16:31:04 ERROR SparkSQLDriver: Failed in [select count(distinct column2) from aero where intColumn1 > -1000000 and intColumn1 < 100000] org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 34.0 failed 1 times, most recent failure: Lost task 0.0 in stage 34.0 (TID 29, localhost): java.lang.Exception: (gen:0),(exp:0),(bins:(FAILURE:UDF: Execution Error 2 : /opt/aerospike/usr/udf/lua/spark_filters.lua:32: bad argument #1 to 'ipairs' (table expected, got nil))) at com.osscube.spark.aerospike.rdd.AerospikeRDD$$anonfun$compute$4.apply(AerospikeRDD.scala:111) at com.osscube.spark.aerospike.rdd.AerospikeRDD$$anonfun$compute$4.apply(AerospikeRDD.scala:92) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:129) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:126) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 34.0 failed 1 times, most recent failure: Lost task 0.0 in stage 34.0 (TID 29, localhost): java.lang.Exception: (gen:0),(exp:0),(bins:(FAILURE:UDF: Execution Error 2 : /opt/aerospike/usr/udf/lua/spark_filters.lua:32: bad argument #1 to 'ipairs' (table expected, got nil))) at com.osscube.spark.aerospike.rdd.AerospikeRDD$$anonfun$compute$4.apply(AerospikeRDD.scala:111) at com.osscube.spark.aerospike.rdd.AerospikeRDD$$anonfun$compute$4.apply(AerospikeRDD.scala:92) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:129) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:126) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

15/07/30 16:31:04 ERROR CliDriver: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 34.0 failed 1 times, most recent failure: Lost task 0.0 in stage 34.0 (TID 29, localhost): java.lang.Exception: (gen:0),(exp:0),(bins:(FAILURE:UDF: Execution Error 2 : /opt/aerospike/usr/udf/lua/spark_filters.lua:32: bad argument #1 to 'ipairs' (table expected, got nil))) at com.osscube.spark.aerospike.rdd.AerospikeRDD$$anonfun$compute$4.apply(AerospikeRDD.scala:111) at com.osscube.spark.aerospike.rdd.AerospikeRDD$$anonfun$compute$4.apply(AerospikeRDD.scala:92) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:129) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:126) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

andrewmilkowski commented 9 years ago

if I cast numeric into string, this resolves the problem , but this can not be right I want to have predicates as numerics..

spark-sql> select * from aero order by intColumn1;
abc abc -1000 abc abc -100 abc abc 100 abc abc 1000 abc abc 1500 abc abc 1700 abc abc 10000 abc abc 100000 Time taken: 0.94 seconds, Fetched 8 row(s) spark-sql>

then filter on intColumn1 succeeds (notice single quotes around intColumn1 >= '-100' )

spark-sql> select * from aero where intColumn1 >= '-100' order by intColumn1; abc abc -100 abc abc 100 abc abc 1000 abc abc 1500 abc abc 1600 abc abc 1700 abc abc 10000 abc abc 100000 Time taken: 0.408

andrewmilkowski commented 9 years ago

although if I use predicate '-102' query does work proper..

so this list sorts as strings such as (-102 comes after -1000) so the predicate:

select * from aero where intColumn1 >= '-102' order by intColumn1;

if it was character based sort it would return

100 1000 10000 100000 1500 1700

but it does return proper list

spark-sql> select * from aero where intColumn1 >= '-102' order by intColumn1; abc abc -100 abc abc 100 abc abc 1000 abc abc 1500 abc abc 1600 abc abc 1700 abc abc 10000 abc abc 100000 Time taken: 0.287 seconds, Fetched 8 row(s)

sasha-polev commented 9 years ago

Hi Andrew

Thanks for reporting this - need to test with Aerospike Community Edition build 3.5.14.

Thanks Sasha

sasha-polev commented 9 years ago

Hi Can you try replacing Aerospike client dependency in POM file with this one:

  <dependency>
    <groupId>com.aerospike</groupId>
    <artifactId>aerospike-client</artifactId>
    <version>3.1.0</version>
  </dependency>

This fixes client version to the last tested one.

Thanks Sasha