awslabs / deequ

Deequ is a library built on top of Apache Spark for defining "unit tests for data", which measure data quality in large datasets.
Apache License 2.0
3.3k stars 538 forks source link

BatchNormal anomaly detector throws "java.lang.IncompatibleClassChangeError: Implementing class" #393

Open Blackbaud-PatrickMcDonald opened 3 years ago

Blackbaud-PatrickMcDonald commented 3 years ago

Environment: Databricks Spark 3.0.1 Scala 2.12.10 Java 1.8.0_282 deequ:1.2.2-spark-3.0 pydeequ 1.0.1

I'm attempting to use the BatchNormal anomaly detector strategy and it is throwing the following exception:

Py4JJavaError: An error occurred while calling o405.run.
: java.lang.IncompatibleClassChangeError: Implementing class
    at com.amazon.deequ.anomalydetection.BatchNormalStrategy.detect(BatchNormalStrategy.scala:75)
    at com.amazon.deequ.anomalydetection.AnomalyDetector.detectAnomaliesInHistory(AnomalyDetector.scala:98)
    at com.amazon.deequ.anomalydetection.AnomalyDetector.isNewPointAnomalous(AnomalyDetector.scala:60)
    at com.amazon.deequ.checks.Check$.isNewestPointNonAnomalous(Check.scala:1138)
    at com.amazon.deequ.checks.Check.$anonfun$isNewestPointNonAnomalous$1(Check.scala:433)
    at scala.runtime.java8.JFunction1$mcZD$sp.apply(JFunction1$mcZD$sp.java:23)
    at com.amazon.deequ.constraints.AnalysisBasedConstraint.runAssertion(AnalysisBasedConstraint.scala:108)
    at com.amazon.deequ.constraints.AnalysisBasedConstraint.pickValueAndAssert(AnalysisBasedConstraint.scala:74)
    at com.amazon.deequ.constraints.AnalysisBasedConstraint.$anonfun$evaluate$2(AnalysisBasedConstraint.scala:60)
    at scala.Option.map(Option.scala:230)
    at com.amazon.deequ.constraints.AnalysisBasedConstraint.evaluate(AnalysisBasedConstraint.scala:60)
    at com.amazon.deequ.constraints.ConstraintDecorator.evaluate(Constraint.scala:56)
    at com.amazon.deequ.checks.Check.$anonfun$evaluate$1(Check.scala:1038)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.immutable.List.map(List.scala:298)
    at com.amazon.deequ.checks.Check.evaluate(Check.scala:1038)
    at com.amazon.deequ.VerificationSuite.$anonfun$evaluate$1(VerificationSuite.scala:269)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.immutable.List.map(List.scala:298)
    at com.amazon.deequ.VerificationSuite.evaluate(VerificationSuite.scala:269)
    at com.amazon.deequ.VerificationSuite.doVerificationRun(VerificationSuite.scala:132)
    at com.amazon.deequ.VerificationRunBuilder.run(VerificationRunBuilder.scala:173)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)

To reproduce:

from pyspark.sql import SparkSession, Row
from pydeequ.repository import *
from pydeequ.verification import *
from pydeequ.anomaly_detection import *
from pydeequ.analyzers import *

spark = (SparkSession
    .builder
    .getOrCreate())
sc = spark.sparkContext

metricsRepositoryPath = "dbfs:/path/to/my/metrics.json"
metricsRepository = FileSystemMetricsRepository(spark, metricsRepositoryPath)

todaysDataset = sc.parallelize([
            Row(a=80,b=5,),
            Row(a=3,  b=30,),
            Row(a=10, b=5,)]).toDF()

currTime = ResultKey.current_milli_time()
todaysKey = ResultKey(spark, currTime)

currResult = VerificationSuite(spark).onData(todaysDataset) \
    .useRepository(metricsRepository) \
    .addAnomalyCheck(BatchNormalStrategy(), Sum("a")) \
    .run()

Is this a compatibility issue with the version of Breeze being used?

sakshaat commented 2 years ago

Hi, I'm seeing a similar issue with this anomaly check as well on EMR 6.40 (spark 3.1.2) running deequ 2.0.0. This is the exception I get:

java.lang.NoClassDefFoundError: breeze/stats/package$
  at com.amazon.deequ.anomalydetection.BatchNormalStrategy.detect(BatchNormalStrategy.scala:75)
  at com.amazon.deequ.anomalydetection.AnomalyDetector.detectAnomaliesInHistory(AnomalyDetector.scala:98)
  at com.amazon.deequ.anomalydetection.AnomalyDetector.isNewPointAnomalous(AnomalyDetector.scala:60)
  at com.amazon.deequ.checks.Check$.isNewestPointNonAnomalous(Check.scala:1138)
  at com.amazon.deequ.checks.Check.$anonfun$isNewestPointNonAnomalous$1(Check.scala:433)
  at scala.runtime.java8.JFunction1$mcZD$sp.apply(JFunction1$mcZD$sp.java:23)
  at com.amazon.deequ.constraints.AnalysisBasedConstraint.runAssertion(AnalysisBasedConstraint.scala:108)
  at com.amazon.deequ.constraints.AnalysisBasedConstraint.pickValueAndAssert(AnalysisBasedConstraint.scala:74)
  at com.amazon.deequ.constraints.AnalysisBasedConstraint.$anonfun$evaluate$2(AnalysisBasedConstraint.scala:60)
import com.amazon.deequ.anomalydetection.BatchNormalStrategy

def batchNormalStrategyTest() = {
    val strategy = BatchNormalStrategy(lowerDeviationFactor=Some(3.0), upperDeviationFactor=Some(3.0))
    datasetVerificationRunner(df, {v: VerificationRunBuilderWithRepository => v.addAnomalyCheck(strategy, Compliance("column_1", "cardinality(column_1) > 0"))})
}

batchNormalStrategyTest()

The datasetVerificationRunner function is simply building a VerificationRunBuilderWithRepository and applying the higher order function on it to add an AnomalyCheck (It is structured this way because my intention is to quickly test out different strategies on a number of different instances of the same dataset).

Basically something like

val verificationResult = addAnomalyCheck(VerificationSuite().onData(df.where(s"date='${date}'")).useRepository(repo).saveOrAppendResult(newKey)).run()
val resultDf = checkResultsAsDataFrame(spark, verificationResult)

Is this a known issue with this class or am I using it incorrectly in this case?

mfridrikhson-tp commented 2 years ago

For anyone having trouble running it on Databricks - it seems to be a conflict between breeze dependency versions - DBR 7.3 and later use 1.x which conflicts with Deequ's 0.13.2

chenliu0831 commented 2 years ago

Possible to verify if the issue go away with latest mainline?

pip install git+https://github.com/awslabs/python-deequ.git@7ec9f6f72839779f370a4753c0db26d6cf052203

natanaeldgsantos commented 10 months ago

I still have the same error too using databricks DBR 11.3 with spark 3.3.0 and DBR 12.2 with spark 3.3.2. im using pydeequ 1.2 deequ 2.0.4