awslabs / deequ

Deequ is a library built on top of Apache Spark for defining "unit tests for data", which measure data quality in large datasets.
Apache License 2.0
3.32k stars 539 forks source link

[BUG] Unable to serialize Histogram with binningUdf when using them with useRepository #500

Open psyking841 opened 1 year ago

psyking841 commented 1 year ago

Describe the bug When using Histogram analyzer (with an UDF) with useRepository API, I got Unable to serialize Histogram with binningUdf error.

Stacktrace:

An error was encountered:
java.lang.IllegalArgumentException: Unable to serialize Histogram with binningUdf!
  at com.amazon.deequ.repository.AnalyzerSerializer$.serialize(AnalysisResultSerde.scala:314)
  at com.amazon.deequ.repository.AnalyzerSerializer$.serialize(AnalysisResultSerde.scala:221)
  at com.google.gson.internal.bind.TreeTypeAdapter.write(TreeTypeAdapter.java:81)
  at com.google.gson.Gson.toJson(Gson.java:704)
  at com.google.gson.Gson.toJsonTree(Gson.java:597)
  at com.google.gson.internal.bind.TreeTypeAdapter$GsonContextImpl.serialize(TreeTypeAdapter.java:158)
  at com.amazon.deequ.repository.AnalyzerContextSerializer$.$anonfun$serialize$2(AnalysisResultSerde.scala:182)
  at com.amazon.deequ.repository.AnalyzerContextSerializer$.$anonfun$serialize$2$adapted(AnalysisResultSerde.scala:179)
  at scala.collection.immutable.Map$Map2.foreach(Map.scala:273)
  at com.amazon.deequ.repository.AnalyzerContextSerializer$.serialize(AnalysisResultSerde.scala:179)
  at com.amazon.deequ.repository.AnalyzerContextSerializer$.serialize(AnalysisResultSerde.scala:170)
  at com.google.gson.internal.bind.TreeTypeAdapter.write(TreeTypeAdapter.java:81)
  at com.google.gson.Gson.toJson(Gson.java:704)
  at com.google.gson.Gson.toJsonTree(Gson.java:597)
  at com.google.gson.internal.bind.TreeTypeAdapter$GsonContextImpl.serialize(TreeTypeAdapter.java:158)
  at com.amazon.deequ.repository.AnalysisResultSerializer$.serialize(AnalysisResultSerde.scala:149)
  at com.amazon.deequ.repository.AnalysisResultSerializer$.serialize(AnalysisResultSerde.scala:139)
  at com.google.gson.internal.bind.TreeTypeAdapter.write(TreeTypeAdapter.java:81)
  at com.google.gson.internal.bind.TypeAdapterRuntimeTypeWrapper.write(TypeAdapterRuntimeTypeWrapper.java:69)
  at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.write(CollectionTypeAdapterFactory.java:97)
  at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.write(CollectionTypeAdapterFactory.java:61)
  at com.google.gson.Gson.toJson(Gson.java:704)
  at com.google.gson.Gson.toJson(Gson.java:683)
  at com.google.gson.Gson.toJson(Gson.java:638)
  at com.amazon.deequ.repository.AnalysisResultSerde$.serialize(AnalysisResultSerde.scala:90)
  at com.amazon.deequ.repository.fs.FileSystemMetricsRepository.save(FileSystemMetricsRepository.scala:57)
  at com.amazon.deequ.analyzers.runners.AnalysisRunner$.$anonfun$saveOrAppendResultsIfNecessary$2(AnalysisRunner.scala:233)
  at com.amazon.deequ.analyzers.runners.AnalysisRunner$.$anonfun$saveOrAppendResultsIfNecessary$2$adapted(AnalysisRunner.scala:225)
  at scala.Option.foreach(Option.scala:407)
  at com.amazon.deequ.analyzers.runners.AnalysisRunner$.$anonfun$saveOrAppendResultsIfNecessary$1(AnalysisRunner.scala:225)
  at com.amazon.deequ.analyzers.runners.AnalysisRunner$.$anonfun$saveOrAppendResultsIfNecessary$1$adapted(AnalysisRunner.scala:224)
  at scala.Option.foreach(Option.scala:407)
  at com.amazon.deequ.analyzers.runners.AnalysisRunner$.saveOrAppendResultsIfNecessary(AnalysisRunner.scala:224)
  at com.amazon.deequ.analyzers.runners.AnalysisRunner$.doAnalysisRun(AnalysisRunner.scala:204)
  at com.amazon.deequ.analyzers.runners.AnalysisRunBuilder.run(AnalysisRunBuilder.scala:110)
  ... 61 elided

To Reproduce Steps to reproduce the behavior: Just run below code with any df that require a binningUDF:

I ran them in Jupyternotebook, each code block below runs in one notebook block.

val analysisResult: AnalyzerContext = (AnalysisRunner
          .onData(df)
          .addAnalyzer(Size())
          .addAnalyzer(Histogram("score", Some(scoreBinningUdf)))
          .useRepository(FileSystemMetricsRepository(spark, "s3://path/to/metrics/file"))
          .saveOrAppendResult(resultKey)
          .run())
val analysisResults = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResults.show(100, truncate = false)

Here is my UDF, you should be able to use it as it is:

val scoreBinningUdf = udf((score: Double) => {
    if (score < 0.10) {
        "lower"
    } else if (score > 0.90) {
        "upper"
    } else {
        "mid"
    }
})
  1. See error

Expected behavior Above code should just work!

Screenshots If applicable, add screenshots to help explain your problem.

Additional context There are two ways to make above code works:

  1. remove below two lines
    .useRepository(FileSystemMetricsRepository(spark, "s3://path/to/metrics/file"))
    .saveOrAppendResult(resultKey)

    Or

  2. Removing UDF from Histogram analyzer, i.e. applying the UDF to the df (to create a new column) before the analyzer.

Therefore, I think it is the problem is the "incompatibility" between Histogram w. UDF and useRepository.