Open mattseburn opened 1 year ago
There was an earlier similar question: #155.
There are a couple of possible approaches:
Write a UDF that takes the JSON string, parses the string into A JSON object using any JSON lib you like, turn that JSON object back into a JSON string with sorted keys. That should turn your JSON into comparable strings.
Spark is limited when it comes to JSON with arbitrary schema. There are two types of arbitrary schemata: each row has a different schema, or all rows have the same schema but that schema is arbitrary.
Spark can parse a JSON string column into a JSON column, but it always needs the schema of the column. All rows have the same schema. The method schema_from_json
does not work for a string column but only a string literal. So given one example JSON, it can provide you the schema, then from_json
turns that JSON string into a struct of that schema, which can be diffed:
val sampleJson = left.select($"json").as[String].take(1).head
val leftJson = left.select($"id", from_json($"json", schema_of_json(lit(sampleJson))).as("json"))
val rightJson = right.select($"id", from_json($"json", schema_of_json(lit(sampleJson))).as("json"))
leftJson.diff(rightJson, "id").show(false)
+----+---+------------------+------------------+
|diff|id |left_json |right_json |
+----+---+------------------+------------------+
|N |1 |{__bar__, __foo__}|{__bar__, __foo__}|
+----+---+------------------+------------------+
Use sampleJson
for leftJson
and rightJson
so that those dataframes are comparable. Be aware that that sample JSON string becomes part of the query, which has memory limitations (<1MB).
If rows have different schemata, you need to find a common schema and use that. In Databricks, there is schema_of_json_agg, which might work on a string column and provide your an aggregated / common schema.
Similar to parsing the JSON string column into a JSON type column is parsing it into a map column if you know the depth of the JSON (and when it has the same depth for every key). So this is also very limited.
Here is an example of a flat JSON (no nested JSON objects):
val left = Seq((1, """{"foo":"__foo__","bar":"__bar__"}""")).toDF("id", "json")
val right = Seq((1, """{"bar":"__bar__","foo":"__foo__"}""")).toDF("id", "json")
val leftMap = left.select($"id", from_json($"json", MapType(StringType, StringType)))
val rightMap = right.select($"id", from_json($"json", MapType(StringType, StringType)))
val options = DiffOptions.default.withComparator(DiffComparators.map[String, String], MapType(StringType, StringType))
new Differ(options).diff(leftMap, rightMap, "id").show
+----+---+--------------------------------+--------------------------------+
|diff|id |left_entries |right_entries |
+----+---+--------------------------------+--------------------------------+
|C |1 |{foo -> __foo__, bar -> __bar__}|{bar -> __bar__, foo -> __foo__}|
+----+---+--------------------------------+--------------------------------+
Apparently, the map comparator does not see the two maps are identical, I will check if this is a bug in the comparator.
The flat Map can also be exploded and compared:
val leftExp = leftMap.select($"id", explode($"entries"))
val rightExp = rightMap.select($"id", explode($"entries"))
leftExp.diff(rightExp, "id", "key").show(false)
|diff|id |key|left_value|right_value|
+----+---+---+----------+-----------+
|N |1 |bar|__bar__ |__bar__ |
|N |1 |foo|__foo__ |__foo__ |
+----+---+---+----------+-----------+
This is extremely helpful, thank you so much!
I'll reply here if I have more questions.
Hi there, we have datasets with json variant columns, and need to do a json comparison on the strings they contain so that, for example,
{"foo":"__foo__","bar":"__bar__"}
->{"bar":"__bar__","foo":"__foo__"}
is reported as anN
and not aC
.Additionally, we would ideally not have to specify the schema of the json and have it inferred automatically.
What's the best way to approach this? I've tried creating a
DiffComparator
like this:But keep getting this error:
I also tried creating an
Eqiv
comparator, but was unable to get even the example from the docs to work.What am I missing?
Thanks!