nats-io / nats-spark-connector

Apache License 2.0
8 stars 6 forks source link

Parsing headers causes a map error #32

Open stoddabr opened 1 month ago

stoddabr commented 1 month ago

What version were you using?

Nats V2 code as is.

What environment was the server running in?

Databricks notebook DBR 13.3

Is this defect reproducible?

Yes, it occurs every time for me.

Given the capability you are leveraging, describe your expectation?

No err

Given the expectation, what is the defect you are observing?

Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 1.0 failed 4 times, most recent failure: Lost task 4.3 in stage 1.0 (TID 23) (REDACTED_IP_ADDRESS executor 2): java.lang.ClassCastException: scala.collection.immutable.Map$Map4 cannot be cast to org.apache.spark.sql.catalyst.util.MapData

Stack trace:

    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getMap(rows.scala:50)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getMap$(rows.scala:50)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getMap(rows.scala:195)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
    at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:120)
    at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anon$2.hasNext(InMemoryRelation.scala:287)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:227)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:312)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1654)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1581)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1645)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1438)
    at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1392)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:422)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:372)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
    at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:410)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:407)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
    at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:410)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:407)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
    at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:410)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:407)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
    at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:60)
    at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:410)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:407)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
    at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:410)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:407)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181)
    at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146)
    at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45)
    at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:103)
    at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:108)
    at scala.util.Using$.resource(Using.scala:269)
    at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:107)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:900)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1709)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:903)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:798)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Replacing the following code with headers = None makes the error go away which is how I know this is the issue:

    val headers: Option[Map[String, Seq[String]]] =
      Option(message.getHeaders).map(
        _.entrySet()
          .toSet
          .map((me: util.Map.Entry[String, util.List[String]]) =>
            (me.getKey, me.getValue.asScala.toSeq))
          .toMap)
stoddabr commented 1 month ago

I've found a better fix which preserves the header information as a String using the public serialization method on headers:

    val headers: String = Option(message.getHeaders).orNull match {
      case headers: io.nats.client.impl.Headers => new String(headers.getSerialized, "UTF-8")
      case _ => null
    }

This requires setting the headers column to type String and casting the string to the spark unsafe UTF8String

jnmoyne commented 3 weeks ago

LGTM on the fix you describe of using the public serialization and column type to string, thanks for this you can create a PR for the change.