vericast / spylon-kernel

Jupyter kernel for scala and spark
Other
185 stars 37 forks source link

Can't use case class in the Scala notebook #40

Open leeivan opened 7 years ago

leeivan commented 7 years ago

the version of docker: jupyter/all-spark-notebook:lastest

the way to start docker: docker run -it --rm -p 8888:8888 jupyter/all-spark-notebook:latest or docker ps -a docker start -i containerID

the steps:

Visit http://localhost:8888 Start an spylon-kernal notebook input code above

import spark.implicits._
val p = spark.sparkContext.textFile ("../Data/person.txt")
val pmap = p.map ( _.split (","))
pmap.collect()

the output:res0: Array[Array[String]] = Array(Array(Barack, Obama, 53), Array(George, Bush, 68), Array(Bill, Clinton, 68))

case class Persons (first_name:String,last_name: String,age:Int)
val personRDD = pmap.map ( p => Persons (p(0), p(1), p(2).toInt))
personRDD.take(1)

the error message:

org.apache.spark.SparkDriverExecutionException: Execution error
  at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1186)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1711)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
  at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
  ... 39 elided
Caused by: java.lang.ArrayStoreException: [LPersons;
  at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:90)
  at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:2043)
  at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:2043)
  at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:59)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1182)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1711)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

The above code is working with the spark-shell. From error message, I speculated that the driver program didn't correctly handle case class Persons to RDD partition.

parente commented 7 years ago

I have no insight into what's wrong after poking and doing a bit of research. There have been similar reports of issues like this in the past, with slight variance in the details:

Interestingly enough, the case class works fine when using DataFrames and Datasets:

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}

val first_name = StructField("first_name", StringType)
val last_name = StructField("last_name", StringType)
val age = StructField("age", IntegerType)
val schema = StructType(Array(first_name, last_name, age))

val df = spark.read.schema(schema).csv("person.txt")

val ds = df.as[Persons]

ds.collect()
leeivan commented 7 years ago

I used your example to test my spylon kernel env, it didn't work also. When I executed the code: ds2.collect the error message is:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost, executor driver): java.lang.ClassCastException: DataRow cannot be cast to DataRow
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
  at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
  at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853)
  at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2390)
  at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2390)
  at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
  at org.apache.spark.sql.Dataset.collect(Dataset.scala:2390)
  ... 37 elided
Caused by: java.lang.ClassCastException: DataRow cannot be cast to DataRow
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:108)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  ... 1 more

Do you meet such problem? I didn't figure out why the problem is produced, environment or others. My the docker image: docker push leeivan/spark-lab-spylon

walmaaoui commented 3 years ago

hello guys, Any updates on the issue? this blocks the use of delta-lake (a table format from databricks) with spylon. I am getting:

Caused by: java.lang.ArrayStoreException: org.apache.spark.sql.delta.actions.AddFile
  at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:75)
  at scala.Array$.slowcopy(Array.scala:152)
  at scala.Array$.copy(Array.scala:178)
  at scala.collection.mutable.ResizableArray.copyToArray(ResizableArray.scala:80)
  at scala.collection.mutable.ResizableArray.copyToArray$(ResizableArray.scala:78)
  at scala.collection.mutable.ArrayBuffer.copyToArray(ArrayBuffer.scala:49)
  at scala.collection.TraversableOnce.copyToArray(TraversableOnce.scala:316)
  at scala.collection.TraversableOnce.copyToArray$(TraversableOnce.scala:315)
  at scala.collection.AbstractTraversable.copyToArray(Traversable.scala:108)
  at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:324)
  at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:321)
  at scala.collection.AbstractTraversable.toArray(Traversable.scala:108)
  at org.apache.spark.sql.delta.files.DelayedCommitProtocol.commitJob(DelayedCommitProtocol.scala:59)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:215)
zhangfiona commented 2 years ago

I used Apache Toree-Scala kernel in jupyter lab with case class, and had the same problem. I found the jira: https://issues.apache.org/jira/browse/TOREE-428, but the bug also exists in the latest version. Any update now?