snowplow / snowplow-rdb-loader

Stores Snowplow enriched events in Redshift, Snowflake and Databricks
Other
31 stars 17 forks source link

Fix NPEs in TypesAccumulator #1363

Closed istreeter closed 1 month ago

istreeter commented 1 month ago

We've seen exceptions in spark executors like:

java.lang.NullPointerException: Cannot invoke "scala.collection.mutable.Set.isEmpty()" because the return value of "com.snowplowanalytics.snowplow.rdbloader.transformer.batch.spark.TypesAccumulator.accum()" is null

The error is coming from our Spark Accumulator for accumulating Iglu types. This is similar to an issue previously seen in Spark's own CollectionAccumulator. That issue was fixed in Spark by making the accumulator's internal state non-final, and synchronizing access to the internal state. So here we make the exact same change to our own Accumulator.

It is a rare race condition which is hard to reproduce.

istreeter commented 1 month ago

We have another accumulator... the TimestampsAccumulator. But I think that does not need the same fix, because it's internal state is a Option and at runtime a None in scala is just represented by null. And besides, we have never seen a NPE with that accumulator. And Spark core has other accumulators which do not synchronize on every access.