awslabs / deequ

Deequ is a library built on top of Apache Spark for defining "unit tests for data", which measure data quality in large datasets.
Apache License 2.0
3.31k stars 538 forks source link

Correlation analyzer failing in tammruka/2.0.0-spark-3.2.0 #399

Open deenkar opened 2 years ago

deenkar commented 2 years ago

Hi @TammoR

When running a Correlation analyzer on tammruka/2.0.0-spark-3.2.0 it gives the below error and does not compute Correlation.

case class NewNumRawData(totalNumber: Integer, count: Integer)

val rows1 = ss.sparkContext.parallelize(Seq(
  NewNumRawData(1,2),
  NewNumRawData(3,4),
  NewNumRawData(5,6),
  NewNumRawData(7,8),
  NewNumRawData(9,10),
  NewNumRawData(1,2),
  NewNumRawData(1,2),
  NewNumRawData(1,2),
  NewNumRawData(1,2),
  NewNumRawData(1,2),
  NewNumRawData(1,2),

))
val rawData =ss.createDataFrame(rows1)
val analysisResult: AnalyzerContext = { AnalysisRunner
  // data to run the analysis on
  .onData(rawData)
  // define analyzers that compute metrics
  .addAnalyzer(Correlation("totalNumber", "count"))
  // compute metrics
  .run()
}

Is failing with below error in analysis result AnalyzerContext(Map(Correlation(totalNumber,count,None) -> DoubleMetric(Mutlicolumn,Correlation,totalNumber,count,Failure(com.amazon.deequ.analyzers.runners.MetricCalculationRuntimeException: java.lang.ClassCastException: java.lang.Double cannot be cast to org.apache.spark.sql.Row))))

deenkar commented 2 years ago

Test In AnalyzerTests ware also failing.

Pearson correlation->

"yield 1.0 for maximal conditionally informative columns" in withSparkSession { sparkSession =>
  val df = getDfWithConditionallyInformativeColumns(sparkSession)
  Correlation("att1", "att2").calculate(df) shouldBe DoubleMetric(
    Entity.Mutlicolumn,
    "Correlation",
    "att1,att2",
    Success(1.0)
  )

DoubleMetric(Mutlicolumn,Correlation,att1,att2,Failure(com.amazon.deequ.analyzers.runners.MetricCalculationRuntimeException: java.lang.ClassCastException: java.lang.Double cannot be cast to org.apache.spark.sql.Row)) was not equal to DoubleMetric(Mutlicolumn,Correlation,att1,att2,Success(1.0)) ScalaTestFailureLocation: org.scalatest.matchers.MatchersHelper$ at (AnalyzerTests.scala:664) Expected :DoubleMetric(Mutlicolumn,Correlation,att1,att2,Success(1.0)) Actual :DoubleMetric(Mutlicolumn,Correlation,att1,att2,Failure(com.amazon.deequ.analyzers.runners.MetricCalculationRuntimeException: java.lang.ClassCastException: java.lang.Double cannot be cast to org.apache.spark.sql.Row))

org.scalatest.exceptions.TestFailedException: DoubleMetric(Mutlicolumn,Correlation,att1,att2,Failure(com.amazon.deequ.analyzers.runners.MetricCalculationRuntimeException: java.lang.ClassCastException: java.lang.Double cannot be cast to org.apache.spark.sql.Row)) was not equal to DoubleMetric(Mutlicolumn,Correlation,att1,att2,Success(1.0)) at org.scalatest.matchers.MatchersHelper$.indicateFailure(MatchersHelper.scala:339) at org.scalatest.matchers.should.Matchers$AnyShouldWrapper.shouldBe(Matchers.scala:6982) at com.amazon.deequ.analyzers.AnalyzerTests.$anonfun$new$152(AnalyzerTests.scala:668) at com.amazon.deequ.SparkContextSpec.withSparkSession(SparkContextSpec.scala:33) at com.amazon.deequ.SparkContextSpec.withSparkSession$(SparkContextSpec.scala:30) at com.amazon.deequ.analyzers.AnalyzerTests.withSparkSession(AnalyzerTests.scala:32) at com.amazon.deequ.analyzers.AnalyzerTests.$anonfun$new$151(**AnalyzerTests.scala:662**) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)