awslabs / deequ

Deequ is a library built on top of Apache Spark for defining "unit tests for data", which measure data quality in large datasets.
Apache License 2.0
3.32k stars 539 forks source link

[BUG] Performance for building row-level results scales poorly with number of checks #576

Closed marcantony closed 3 months ago

marcantony commented 3 months ago

Describe the bug When generating the row-level results DataFrame using the VerificationResult::lowLevelResultsAsDataFrame method, performance grinds to a halt on large numbers of checks (hundreds to thousands): https://github.com/awslabs/deequ/blob/d495234e4cf2c36e8ca220f156bc21e3f57a3bac/src/main/scala/com/amazon/deequ/VerificationResult.scala#L93-L104

This is due to the DataFrame::withColumn method being used iteratively within it. Here's a relevant note from the withColumn docs:

this method introduces a projection internally. Therefore, calling it multiple times, for instance, via loops in order to add multiple columns can generate big plans which can cause performance issues and even StackOverflowException. To avoid this, use select with the multiple columns at once.

To Reproduce

  1. Run a verification suite with a large number of checks (300-400 is enough to start feeling the pain on my machine).
  2. Generate a row-level result DataFrame using VerificationResult::rowLevelResultsAsDataFrame.

The performance issue is with generating the DataFrame, so you don't need to call an action on it.

Here's a test I wrote which shows the issue:

class LargeChecksResultsTest extends AnyFlatSpec with SparkContextSpec with Inspectors {
  "Deequ" should "handle row-level results for large numbers of checks" in withSparkSession { spark =>
    import spark.implicits._
    val df = Seq("1", "2", "3").toDF("data")

    val lengths = (50 to 400 by 50)
    forAll(lengths) { length =>
      val checks = Seq.tabulate(length)(i =>
        Check(CheckLevel.Error, s"check-$i").isComplete("data"))

      val verificationResult = VerificationSuite()
        .onData(df)
        .addChecks(checks)
        .run()

      println("Gathering row-level results")
      val start = Instant.now()
      val rowLeveLResults = VerificationResult.rowLevelResultsAsDataFrame(spark, verificationResult, df)
      val duration = Duration.between(start, Instant.now())
      println(s"${rowLeveLResults.schema.fields.length} columns in row level results")
      println(s"Duration was ${duration.toMillis}ms")
      println()

      assert(verificationResult.status == CheckStatus.Success)
    }
  }
}

Expected behavior Generating the row-level results should be performant for large numbers of checks, at least up to the limit of a verification suite itself.

Additional context Here's the output from running the above test on my machine:

Gathering row-level results
51 columns in row level results
Duration was 363ms

Gathering row-level results
101 columns in row level results
Duration was 735ms

Gathering row-level results
151 columns in row level results
Duration was 1641ms

Gathering row-level results
201 columns in row level results
Duration was 3421ms

Gathering row-level results
251 columns in row level results
Duration was 6321ms

Gathering row-level results
301 columns in row level results
Duration was 10691ms

Gathering row-level results
351 columns in row level results
Duration was 16275ms

Gathering row-level results
401 columns in row level results
Duration was 23591ms
marcantony commented 3 months ago

The fix here is pretty simple, the method is just iterating over the map to add all the (name, column) pairs to the DataFrame so just use the withColumns method instead.