LucaCanali / sparkMeasure

This is the development repository for sparkMeasure, a tool and library designed for efficient analysis and troubleshooting of Apache Spark jobs. It focuses on easing the collection and examination of Spark metrics, making it a practical choice for both developers and data engineers.
Apache License 2.0
690 stars 144 forks source link

spark 3.2.3 problems #23

Closed dmitrybugakov closed 5 years ago

dmitrybugakov commented 5 years ago

Hi. I'm trying use this library. Samples of code:

build.sbt

val hdpMinorVersion = "3.1.0.0-78"
val hadoopVersion = "3.1.1" + "." + hdpMinorVersion
val sparkVersion = "2.3.2" + "." + hdpMinorVersion

lazy val localResolvers = Seq(
  "mvnrepository" at "https://mvnrepository.com/artifact/",
  "Hortonworks HDP" at "http://repo.hortonworks.com/content/repositories/releases/",
  "Hortonworks Other Dependencies" at "http://repo.hortonworks.com/content/groups/public"
)

val projectResolvers: Seq[Resolver] = Seq(Resolver.defaultLocal, Resolver.mavenLocal) ++ localResolvers

resolvers := projectResolvers

lazy val sparkDependencies = Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion % Provided,
  "org.apache.spark" %% "spark-sql" % sparkVersion % Provided
)

lazy val hdpDependencies = Seq(
  "com.hortonworks.hive" %% "hive-warehouse-connector" % "1.0.0.3.0.1.0-187" % Provided intransitive()
)

lazy val staticAnalyzer = Seq(
  compilerPlugin(dependency = "org.wartremover" %% "wartremover" % "2.3.4")
)

libraryDependencies ++= sparkDependencies ++ hdpDependencies ++ staticAnalyzer ++ Seq(
  "io.monix" %% "monix" % "2.3.3",
  "org.typelevel" %% "cats-core" % "0.9.0",
  //  "io.monix" %% "monix-eval" % "2.3.3",
  "ch.cern.sparkmeasure" %% "spark-measure" % "0.13",
  "io.monix" %% "monix-cats" % "2.3.3",
  "org.scalatest" %% "scalatest" % "2.2.6" % "test"
)

spark-shell

scala> val taskMetrics = ch.cern.sparkmeasure.TaskMetrics(spark)
taskMetrics: ch.cern.sparkmeasure.TaskMetrics = TaskMetrics(org.apache.spark.sql.SparkSession@69c0bae6,false)

scala> taskMetrics.runAndMeasure
   def runAndMeasure[T](f: => T): T

scala> taskMetrics.runAndMeasure(spark.sql("select * from test.test"))
Hive Session ID = 04e7280b-0a45-4fad-867f-f1447faf6bf4
Time taken: 4895 ms
19/02/20 09:01:58 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
19/02/20 09:01:58 WARN TaskMetrics: Stage metrics data refreshed into temp view PerfTaskMetrics
scala.MatchError: (elapsedTime,null) (of class scala.Tuple2)                    
  at ch.cern.sparkmeasure.TaskMetrics$$anonfun$report$1.apply(taskmetrics.scala:206)
  at ch.cern.sparkmeasure.TaskMetrics$$anonfun$report$1.apply(taskmetrics.scala:206)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at ch.cern.sparkmeasure.TaskMetrics.report(taskmetrics.scala:206)
  at ch.cern.sparkmeasure.TaskMetrics.printReport(taskmetrics.scala:215)
  at ch.cern.sparkmeasure.TaskMetrics.runAndMeasure(taskmetrics.scala:282)
  ... 49 elided

What I'm doing wrong?