tuplejump / calliope-release

The repository for the G.A. codebase for Calliope. For the E.A. codebase request an early access to the development repo, http://tuplejump.github.io/calliope/
Apache License 2.0
48 stars 11 forks source link

NullPointerException: While updating table having composite key #9

Closed sonyjv closed 10 years ago

sonyjv commented 10 years ago

First of all, thank you for this wonderful library. It is a great work.

While using this library, I have an issue in updating table with composite key.

Following is the Cassandra table.

CREATE TABLE Search (
    search_id int, category text, listing_id int,  monthYear text, distance double, term text,
    PRIMARY KEY ((category, listing_id), monthYear, search_id)
);

From a Spark job I am trying to insert some records using Calliope 0.9.0-U1-C2-EA. The relevant code is as below.

implicit def keyMarshaller(x: (SearchedCategory)): CQLRowKeyMap = Map("category" -> x.category, "listing_id" -> x.listingID, 
        "monthYear" -> x.monthYear, "search_id" -> x.searchID.toInt)
implicit def rowMarshaller(x: (SearchedCategory)): CQLRowValues = List(x.distance, x.term)  

val categoryQuery = CasBuilder.cql3.withColumnFamily("mydb", "search")
            .saveWithQuery("update search set distance = ?, term = ?")

I am getting the following exception (@cqlsh the query works without any problem). If I remove one column from the composite key (change the no. of keys from 4 to 3) it works without any issue.

Could you please be able to guide in resolving the issue.

java.io.IOException: java.lang.NullPointerException
    at org.apache.cassandra.hadoop.cql3.CqlRecordWriter$RangeClient.run(CqlRecordWriter.java:245)
Caused by: java.lang.NullPointerException
    at org.apache.thrift.protocol.TBinaryProtocol.writeBinary(TBinaryProtocol.java:181)
    at org.apache.cassandra.thrift.Cassandra$execute_prepared_cql3_query_args$execute_prepared_cql3_query_argsStandardScheme.write(Cassandra.java:52956)
    at org.apache.cassandra.thrift.Cassandra$execute_prepared_cql3_query_args$execute_prepared_cql3_query_argsStandardScheme.write(Cassandra.java:52883)
    at org.apache.cassandra.thrift.Cassandra$execute_prepared_cql3_query_args.write(Cassandra.java:52816)
    at org.apache.thrift.TServiceClient.sendBase(TServiceClient.java:63)
    at org.apache.cassandra.thrift.Cassandra$Client.send_execute_prepared_cql3_query(Cassandra.java:1814)
    at org.apache.cassandra.thrift.Cassandra$Client.execute_prepared_cql3_query(Cassandra.java:1804)
    at org.apache.cassandra.hadoop.cql3.CqlRecordWriter$RangeClient.run(CqlRecordWriter.java:229)
14/04/10 13:43:26 WARN scheduler.TaskSetManager: Lost TID 22 (task 16.0:0)
14/04/10 13:43:26 WARN scheduler.TaskSetManager: Loss was due to java.io.IOException
java.io.IOException: java.lang.NullPointerException
    at org.apache.cassandra.hadoop.cql3.CqlRecordWriter$RangeClient.run(CqlRecordWriter.java:245)
14/04/10 13:43:26 ERROR scheduler.TaskSetManager: Task 16.0:0 failed 1 times; aborting job
14/04/10 13:43:26 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 16.0 from pool 
14/04/10 13:43:26 INFO scheduler.DAGScheduler: Failed to run saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:185
Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task 16.0:0 failed 1 times (most recent failure: Exception failure: java.io.IOException: java.lang.NullPointerException)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
milliondreams commented 10 years ago

@sonyjv Will be great if you could run it in Debug mode and show the insert query being executed in both cases.

sonyjv commented 10 years ago

Thanks Rohit for the quick response.

In case of 4 column composite key, for the table CREATE TABLE Search ( search_id int, category text, listing_id int, monthYear text, distance double, term text, PRIMARY KEY ((category, listing_id), monthYear, search_id) );

the query generated is

update search set distance = ?, term = ? WHERE "category" = ? AND "listing_id" = ? AND "monthyear" = ? AND "search_id" = ?

For 3 column composite key, for the table CREATE TABLE Search ( search_id int, category text, listing_id int, monthYear text, distance double, term text, PRIMARY KEY ((category, listing_id), search_id) );

the generated query is

update search set monthYear = ?, distance = ?, name = ? WHERE "category" = ? AND "listing_id" = ? AND "search_id" = ?

The latter is working fine and records are inserted.

category | listing_id | search_id | distance | monthyear | term ----------------------+------------+-----------+----------+-----------+------------------ Restaurants | 3554276 | 1001 | 1.47 | 2001-02 | Cocktail Lounges Japanese Restaurants | 223123 | 1001 | 8.44 | 2001-02 | Cocktail Lounges Restaurants | 4618106 | 1001 | 4.22 | 2001-02 | Cocktail Lounges Restaurants | 4618118 | 1001 | 6.69 | 2001-02 | Cocktail Lounges

sonyjv commented 10 years ago

@Rohit, I found the problem. It was a mistake from my side. I had defined implicit keyMarshaller as implicit def keyMarshaller(x: (SearchedCategory)): CQLRowKeyMap = Map("category" -> x.category, "listing_id" -> x.listingID, "monthYear" -> x.monthYear, "search_id" -> x.searchID.toInt)

In Cassandra column name "monthYear" was created as "monthyear". On changing keyMarshaller to

implicit def keyMarshaller(x: (SearchedCategory)): CQLRowKeyMap = Map("category" -> x.category, "listing_id" -> x.listingID, "monthyear" -> x.monthYear, "search_id" -> x.searchID.toInt)

the issue got resolved.

Thanks again for your help and for the wonderful library. We are going to use it for our implementation. :)

ghost commented 10 years ago

@sonyjv I'm not the Rohit you are looking for. :)