databricks-industry-solutions / hls-payer-mrf-sparkstreaming

Spark Structured Streaming for Payer MRF use case
Other
14 stars 6 forks source link

Runtime Error when executing in spark 3.3.0 (DBR 11.3 LTS) #20

Closed balbarka closed 1 month ago

cwoodcox commented 1 year ago

Came here just now to report this… unsure if it's the same error as @balbarka but I'm running those same versions on Azure Databricks.

Here's my pyspark:

file_location = "dbfs:/FileStore/tables/uhc/2023_01_01_United_Healthcare_Services__Inc__Third_Party_Administrator_Core_POS_579_in_network_rates_.gz"

df = spark.readStream.format("payer-mrf").option("buffersize", 67108864).load(file_location)

query = df.writeStream.trigger(once=True).toTable(
    "payer_transparency_ingest",
    format="delta",
    outputMode="append",
    checkpointLocation=file_location + "_checkpoint",
)

and my stack trace:

java.lang.NoSuchMethodError: org.apache.spark.sql.execution.LogicalRDD.<init>(Lscala/collection/Seq;Lorg/apache/spark/rdd/RDD;Lorg/apache/spark/sql/catalyst/plans/physical/Partitioning;Lscala/collection/Seq;ZLorg/apache/spark/sql/SparkSession;)V
    at com.databricks.labs.sparkstreaming.jsonmrf.JsonMRFSource.getBatch(JsonMRFSource.scala:221)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:710)
    at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
    at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:706)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:320)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:318)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:77)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:706)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$5(MicroBatchExecution.scala:372)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution(MicroBatchExecution.scala:1003)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$2(MicroBatchExecution.scala:369)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:320)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:318)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:77)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$1(MicroBatchExecution.scala:327)
    at org.apache.spark.sql.execution.streaming.SingleBatchExecutor.execute(TriggerExecutor.scala:39)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStreamWithListener(MicroBatchExecution.scala:317)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:305)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:380)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:985)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:344)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$4(StreamExecution.scala:269)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:104)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:269)
balbarka commented 1 year ago

@cwoodcox can you confirm your exception DBR and try again if it wasn't in 10.4 LTS?

I'd like to isolate if this is spark version related.

cwoodcox commented 1 year ago

I created another cluster with 10.4 LTS and Spark 3.2.1, and the same notebook worked swimmingly.

cwoodcox commented 1 year ago

I'm not experienced in Scala, but digging through the source of the API it's complaining about and it doesn't seem to have changed between 3.2.1 and 3.3.0, so it's puzzling.

zavoraad commented 1 year ago

Adding this to make jar available for Spark 3.3.0 compatibility