SAP-archive / HANAVora-Extensions

Spark extensions for business contexts
Apache License 2.0
36 stars 18 forks source link

SapSQLContext fails with DataFrame created through Cassandra #9

Open ramnivas opened 8 years ago

ramnivas commented 8 years ago

If I create a DataFrame through SapSQLContext and invoke an SQL that performs a join, it fails with a serialization exception. However, an identical code works fine with the plain SQLContext. Here is a minimized code to reproduce (I am using a self-join to keep things simple):

bin/spark-shell --conf spark.cassandra.conneost=127.0.0.1 \
                --packages datastax:spark-cassandra-connector:1.6.1-s_2.10 \
                --jars ~/.m2/repository/com/sap/spark/core/1.3.79/core-1.3.79.jar
import org.apache.spark.sql._

val sqlc = new SapSQLContext(sc)

val df = sqlc.read.format("org.apache.spark.sql.cassandra").
         options(Map("table" -> "employee", "keyspace" -> "organization")).load()
df.show()

df.registerTempTable("employee")

val bugDf = sqlc.sql("select * from employee report, employee manager")
bugDf.show()

Results in long stack trace, but the gist of it is the following line:

java.io.NotSerializableException for org.apache.spark.sql.cassandra.CassandraSourceRelation 

If you replace the line

val sqlc = new SapSQLContext(sc)

with

val sqlc = new SQLContext(sc)

it all works fine. It also works fine if I don't have any joins.

Here is the Cassandra CQL that you may find helpful in reproducing the problem.

CREATE KEYSPACE organization WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
CREATE TABLE organization.employee (id INT PRIMARY KEY, name TEXT, role TEXT, salary FLOAT, reports_to INT);

INSERT INTO organization.employee (id, name, role, salary) values (1, 'A', 'CEO', 100);
burgerdev commented 8 years ago

Hi,

I was able to reproduce the problem following your instructions with the latest master, full stack trace below. Going to investigate.

Cheers, Markus

org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)
    at org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.scala:56)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:187)
    at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
    at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
    at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1498)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1505)
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375)
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1374)
    at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099)
    at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374)
    at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456)
    at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:319)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
    at $iwC$$iwC$$iwC.<init>(<console>:50)
    at $iwC$$iwC.<init>(<console>:52)
    at $iwC.<init>(<console>:54)
    at <init>(<console>:56)
    at .<init>(<console>:60)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.sql.cassandra.CassandraSourceRelation
Serialization stack:
    - object not serializable (class: org.apache.spark.sql.cassandra.CassandraSourceRelation, value: org.apache.spark.sql.cassandra.CassandraSourceRelation@568514f)
    - field (class: org.apache.spark.sql.execution.datasources.LogicalRelation, name: relation, type: class org.apache.spark.sql.sources.BaseRelation)
    - object (class org.apache.spark.sql.execution.datasources.LogicalRelation, Relation[id#0,name#1,reports_to#2,role#3,salary#4] org.apache.spark.sql.cassandra.CassandraSourceRelation@568514f
)
    - field (class: org.apache.spark.sql.execution.PhysicalSelfJoin, name: left, type: class org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)
    - object (class org.apache.spark.sql.execution.PhysicalSelfJoin, PhysicalSelfJoin Relation[id#0,name#1,reports_to#2,role#3,salary#4] org.apache.spark.sql.cassandra.CassandraSourceRelation@568514f, Relation[id#5,name#6,reports_to#7,role#8,salary#9] org.apache.spark.sql.cassandra.CassandraSourceRelation@568514f, Inner, None, Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@568514f[id#0,name#1,reports_to#2,role#3,salary#4] , [id#0,name#1,reports_to#2,role#3,salary#4,id#5,name#6,reports_to#7,role#8,salary#9], Relation[id#0,name#1,reports_to#2,role#3,salary#4] org.apache.spark.sql.cassandra.CassandraSourceRelation@568514f, Relation[id#5,name#6,reports_to#7,role#8,salary#9] org.apache.spark.sql.cassandra.CassandraSourceRelation@568514f
)
    - field (class: org.apache.spark.sql.execution.Project, name: child, type: class org.apache.spark.sql.execution.SparkPlan)
    - object (class org.apache.spark.sql.execution.Project, Project [id#0,name#1,reports_to#2,role#3,salary#4,id#5,name#6,reports_to#7,role#8,salary#9]
+- PhysicalSelfJoin Relation[id#0,name#1,reports_to#2,role#3,salary#4] org.apache.spark.sql.cassandra.CassandraSourceRelation@568514f, Relation[id#5,name#6,reports_to#7,role#8,salary#9] org.apache.spark.sql.cassandra.CassandraSourceRelation@568514f, Inner, None, Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@568514f[id#0,name#1,reports_to#2,role#3,salary#4] , [id#0,name#1,reports_to#2,role#3,salary#4,id#5,name#6,reports_to#7,role#8,salary#9], Relation[id#0,name#1,reports_to#2,role#3,salary#4] org.apache.spark.sql.cassandra.CassandraSourceRelation@568514f, Relation[id#5,name#6,reports_to#7,role#8,salary#9] org.apache.spark.sql.cassandra.CassandraSourceRelation@568514f
)
    - field (class: org.apache.spark.sql.execution.ConvertToSafe, name: child, type: class org.apache.spark.sql.execution.SparkPlan)
    - object (class org.apache.spark.sql.execution.ConvertToSafe, ConvertToSafe
+- Project [id#0,name#1,reports_to#2,role#3,salary#4,id#5,name#6,reports_to#7,role#8,salary#9]
   +- PhysicalSelfJoin Relation[id#0,name#1,reports_to#2,role#3,salary#4] org.apache.spark.sql.cassandra.CassandraSourceRelation@568514f, Relation[id#5,name#6,reports_to#7,role#8,salary#9] org.apache.spark.sql.cassandra.CassandraSourceRelation@568514f, Inner, None, Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@568514f[id#0,name#1,reports_to#2,role#3,salary#4] , [id#0,name#1,reports_to#2,role#3,salary#4,id#5,name#6,reports_to#7,role#8,salary#9], Relation[id#0,name#1,reports_to#2,role#3,salary#4] org.apache.spark.sql.cassandra.CassandraSourceRelation@568514f, Relation[id#5,name#6,reports_to#7,role#8,salary#9] org.apache.spark.sql.cassandra.CassandraSourceRelation@568514f
)
    - field (class: org.apache.spark.sql.execution.ConvertToSafe$$anonfun$2, name: $outer, type: class org.apache.spark.sql.execution.ConvertToSafe)
    - object (class org.apache.spark.sql.execution.ConvertToSafe$$anonfun$2, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
    ... 80 more