awslabs / deequ

Deequ is a library built on top of Apache Spark for defining "unit tests for data", which measure data quality in large datasets.
Apache License 2.0
3.25k stars 533 forks source link

Unable to run the demo code on data science workbench environment of cloudera #420

Closed ankit-khare-2015 closed 2 years ago

ankit-khare-2015 commented 2 years ago

Spark version _: image

`case class RawData( productName: String, totalNumber: String, status: String, valuable: String )

val rows = spark.sparkContext.parallelize(Seq( RawData("thingA", "13.0", "IN_TRANSIT", "true"), RawData("thingA", "5", "DELAYED", "false"), RawData("thingB", null, "DELAYED", null), RawData("thingC", null, "IN_TRANSIT", "false"), RawData("thingD", "1.0", "DELAYED", "true"), RawData("thingC", "7.0", "UNKNOWN", null), RawData("thingC", "24", "UNKNOWN", null), RawData("thingE", "20", "DELAYED", "false"), RawData("thingA", "13.0", "IN_TRANSIT", "true"), RawData("thingA", "5", "DELAYED", "false"), RawData("thingB", null, "DELAYED", null), RawData("thingC", null, "IN_TRANSIT", "false"), RawData("thingD", "1.0", "DELAYED", "true"), RawData("thingC", "17.0", "UNKNOWN", null), RawData("thingC", "22", "UNKNOWN", null), RawData("thingE", "23", "DELAYED", "false") ))

val data = spark.createDataFrame(rows) print(data.getClass) val suggestionResult = ConstraintSuggestionRunner().onData(data) .addConstraintRules(Rules.DEFAULT) .run()`

Error

val suggestionResult = ConstraintSuggestionRunner().onData(sqlDF) .addConstraintRules(Rules.DEFAULT) .run() Name: Unknown Error Message: <console>:56: error: type mismatch; found : org.apache.spark.sql.org.apache.spark.sql.org.apache.spark.sql.org.apache.spark.sql.org.apache.spark.sql.DataFrame (which expands to) org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] required: org.apache.spark.sql.org.apache.spark.sql.org.apache.spark.sql.org.apache.spark.sql.org.apache.spark.sql.DataFrame (which expands to) org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] val suggestionResult = ConstraintSuggestionRunner().onData(data) ^

ankit-khare-2015 commented 2 years ago

any update here ..?? i really need this to be working

ankit-khare-2015 commented 2 years ago

joemcmahon hope someone can help me understand this issue

ankit-khare-2015 commented 2 years ago

apparently below code solves the issue

`import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()

// For implicit conversions like converting RDDs to DataFrames import spark.implicits._

val rows = spark.sparkContext.parallelize(Seq( RawData("thingA", "13.0", "IN_TRANSIT", "true"), RawData("thingA", "5", "DELAYED", "false"), RawData("thingB", null, "DELAYED", null), RawData("thingC", null, "IN_TRANSIT", "false"), RawData("thingD", "1.0", "DELAYED", "true"), RawData("thingC", "7.0", "UNKNOWN", null), RawData("thingC", "24", "UNKNOWN", null), RawData("thingE", "20", "DELAYED", "false"), RawData("thingA", "13.0", "IN_TRANSIT", "true"), RawData("thingA", "5", "DELAYED", "false"), RawData("thingB", null, "DELAYED", null), RawData("thingC", null, "IN_TRANSIT", "false"), RawData("thingD", "1.0", "DELAYED", "true"), RawData("thingC", "17.0", "UNKNOWN", null), RawData("thingC", "22", "UNKNOWN", null), RawData("thingE", "23", "DELAYED", "false") )) val data = spark.createDataFrame(rows)

data.printSchema

print(data.getClass)

val suggestionResult = {ConstraintSuggestionRunner().onData(data).addConstraintRules(Rules.DEFAULT).run()}

suggestionResult.constraintSuggestions.foreach { case (column, suggestions) => suggestions.foreach { suggestion => println(s"Constraint suggestion for '$column':\t${suggestion.description}\n" + s"The corresponding scala code is ${suggestion.codeForConstraint}\n") } }`

I guess an explicit conversion was needed

ankit-khare-2015 commented 2 years ago

i was able to fix this issue by myself , we have to be a bit more proactive in providing support in case some help is needed in dev side do let me know thanks