LucaCanali / sparkMeasure

This is the development repository for sparkMeasure, a tool and library designed for efficient analysis and troubleshooting of Apache Spark jobs. It focuses on easing the collection and examination of Spark metrics, making it a practical choice for both developers and data engineers.
Apache License 2.0
693 stars 144 forks source link

taskVals.toDF java.lang.ClassCastException: ch.cern.sparkmeasure.StageVals incompatible with ch.cern.sparkmeasure.TaskVals #11

Closed KennethNagin closed 6 years ago

KennethNagin commented 6 years ago

I'm trying to use the deserializer. However the toDF that follows is throwing an exception:

Details: val taskVals = ch.cern.sparkmeasure.Utils.readSerializedTaskMetrics("/tmp/stageMetrics.serialized") scala> val taskVals = ch.cern.sparkmeasure.Utils.readSerializedTaskMetrics("/tmp/stageMetrics.serialized") taskVals: scala.collection.mutable.ListBuffer[ch.cern.sparkmeasure.TaskVals] = ListBuffer(StageVals(0,0,parquet at NativeMethodAccessorImpl.java:0,1531905390534,1531905392912,2378,1,1878,123,368,259,5,78,4802,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0), StageVals(1,1,count at NativeMethodAccessorImpl.java:0,1531905395225,1531905405201,9976,1003,1028843,104666,103972,21334,218,35221,1714153,144,0,0,0,100818815,11697322,0,0,0,0,0,0,0,1123,56844,1003), StageVals(1,2,count at NativeMethodAccessorImpl.java:0,1531905405221,1531905405591,370,1,336,176,26,19,1,0,2382,2,0,0,0,0,0,0,0,55,56844,1003,28,975,0,0,0), StageVals(2,3,collect at /root/SparCle/workload/sqlquery/data-layout-read.py:20,1531905405925,1531905413411,7486,1003,813482,25942,6350,1765,58,9238,1707192,144,0,0,0,100818815,156304569,0,0,0,0,0,... scala> val taskMetricsDF = taskVals.toDF() java.lang.RuntimeException: Error while encoding: java.lang.ClassCastException: ch.cern.sparkmeasure.StageVals incompatible with ch.cern.sparkmeasure.TaskVals assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).jobId AS jobId#444 assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).stageId AS stageId#445 assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).index AS index#446L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).launchTime AS launchTime#447L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).finishTime AS finishTime#448L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).duration AS duration#449L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).schedulerDelay AS schedulerDelay#450L staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).executorId, true) AS executorId#451 staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).host, true) AS host#452 assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).taskLocality AS taskLocality#453 assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).speculative AS speculative#454 assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).gettingResultTime AS gettingResultTime#455L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).successful AS successful#456 assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).executorRunTime AS executorRunTime#457L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).executorCpuTime AS executorCpuTime#458L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).executorDeserializeTime AS executorDeserializeTime#459L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).executorDeserializeCpuTime AS executorDeserializeCpuTime#460L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).resultSerializationTime AS resultSerializationTime#461L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).jvmGCTime AS jvmGCTime#462L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).resultSize AS resultSize#463L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).numUpdatedBlockStatuses AS numUpdatedBlockStatuses#464 assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).diskBytesSpilled AS diskBytesSpilled#465L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).memoryBytesSpilled AS memoryBytesSpilled#466L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).peakExecutionMemory AS peakExecutionMemory#467L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).recordsRead AS recordsRead#468L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).bytesRead AS bytesRead#469L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).recordsWritten AS recordsWritten#470L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).bytesWritten AS bytesWritten#471L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).shuffleFetchWaitTime AS shuffleFetchWaitTime#472L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).shuffleTotalBytesRead AS shuffleTotalBytesRead#473L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).shuffleTotalBlocksFetched AS shuffleTotalBlocksFetched#474L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).shuffleLocalBlocksFetched AS shuffleLocalBlocksFetched#475L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).shuffleRemoteBlocksFetched AS shuffleRemoteBlocksFetched#476L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).shuffleWriteTime AS shuffleWriteTime#477L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).shuffleBytesWritten AS shuffleBytesWritten#478L assertnotnull(assertnotnull(input[0, ch.cern.sparkmeasure.TaskVals, true])).shuffleRecordsWritten AS shuffleRecordsWritten#479L at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290) at org.apache.spark.sql.SparkSession$$anonfun$2.apply(SparkSession.scala:464) at org.apache.spark.sql.SparkSession$$anonfun$2.apply(SparkSession.scala:464) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.SparkSession.createDataset(SparkSession.scala:464) at org.apache.spark.sql.SQLContext.createDataset(SQLContext.scala:377) at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:213) ... 48 elided Caused by: java.lang.ClassCastException: ch.cern.sparkmeasure.StageVals incompatible with ch.cern.sparkmeasure.TaskVals at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:287) ... 60 more

scala>