cloudera-labs / envelope

Build configuration-driven ETL pipelines on Apache Spark
Apache License 2.0
158 stars 89 forks source link

NPE When access Spark Session in getExistingForFilters of RandomOutput #39

Closed qiulin closed 4 years ago

qiulin commented 4 years ago

When try to implement getExistingForFilters of RandomOutput and access Spark Session, got NullPointerException.

Code like this:


val list = Contexts.getSparkSession().read().jdbc(url, "($query) t999", properties).collectAsList()

Exception log like this:

19/08/09 17:06:11 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3, localhost, executor driver): java.lang.NullPointerException
    at org.apache.spark.sql.execution.SparkPlan.sparkContext(SparkPlan.scala:56)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.metrics$lzycompute(WholeStageCodegenExec.scala:511)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.metrics(WholeStageCodegenExec.scala:510)
    at org.apache.spark.sql.execution.SparkPlan.resetMetrics(SparkPlan.scala:85)
    at org.apache.spark.sql.Dataset$$anonfun$withAction$1.apply(Dataset.scala:3361)
    at org.apache.spark.sql.Dataset$$anonfun$withAction$1.apply(Dataset.scala:3360)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:117)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3360)
    at org.apache.spark.sql.Dataset.collectAsList(Dataset.scala:2794)
    at com.thothinfo.ipd.dataflow.extension.output.JdbcExOutput.getExistingForFilters(JdbcExOutput.kt:248)
    at com.cloudera.labs.envelope.run.DataStep$JoinExistingForKeysFunction.call(DataStep.java:639)
    at com.cloudera.labs.envelope.run.DataStep$JoinExistingForKeysFunction.call(DataStep.java:598)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
jeremybeard commented 4 years ago

getExistingFilters is run on the executors, so there is no driver SparkSession object available. If you're trying to create a JDBC random output, you'd need to use a JDBC client for this method because Spark's JDBC integration can only be used from the driver.

Out of curiosity, why do you want to create this as a random output?

qiulin commented 4 years ago

getExistingFilters is run on the executors, so there is no driver SparkSession object available. If you're trying to create a JDBC random output, you'd need to use a JDBC client for this method because Spark's JDBC integration can only be used from the driver.

Out of curiosity, why do you want to create this as a random output?

Thanks for your answer. We need to support creating Type 2 SCD dimension table on a RDMS Data Warehouse, like Greenplum.