This is an experimental prototype for building a statistical model to repair tabular data errors on Apache Spark which is a parallel and distributed framework for large-scale data processing. Clean and consistent data is one of major interests for downstream analytics; clean data makes machine learning and BI reporting more accurate and consistent data with constraints (e.g., functional dependences) is important for efficient query plans. Therefore, data repairing is a first step for a reliable analytics pipeline.
$ git clone https://github.com/maropu/spark-data-repair-plugin.git
$ cd spark-data-repair-plugin
# This repository includes a simple wrapper script `bin/python` to create
# a conda virtual environment to resolve the required dependencies
# (e.g., Python 3.7 and PySpark 3.2), and then
# launch a Python VM with our plugin.
$ ./bin/python
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.2.0
/_/
Using Python version 3.7.11 (default, Jul 27 2021 07:03:16)
SparkSession available as 'spark'.
Delphi APIs (version 0.1.0-spark3.2-EXPERIMENTAL) available as 'delphi'.
# Loads CSV data having seven NULL cells
>>> spark.read.option("header", True).csv("./testdata/adult.csv").createOrReplaceTempView("adult")
>>> spark.table("adult").show()
+---+-----+------------+-----------------+-------------+------+-------------+-----------+
|tid| Age| Education| Occupation| Relationship| Sex| Country| Income|
+---+-----+------------+-----------------+-------------+------+-------------+-----------+
| 0|31-50|Some-college| Craft-repair| Husband| Male|United-States|LessThan50K|
| 1| >50|Some-college| Exec-managerial| Own-child|Female|United-States|LessThan50K|
| 2|31-50| Bachelors| Sales| Husband| Male|United-States|LessThan50K|
| 3|22-30| HS-grad| Craft-repair| Own-child| null|United-States|LessThan50K|
| 4|22-30| HS-grad| Farming-fishing| Husband|Female|United-States|LessThan50K|
| 5| null|Some-college| Craft-repair| Husband| Male|United-States| null|
| 6|31-50| HS-grad| Prof-specialty|Not-in-family|Female|United-States|LessThan50K|
| 7|31-50| Prof-school| Prof-specialty| Husband| null| India|MoreThan50K|
| 8|18-21|Some-college| Adm-clerical| Own-child|Female|United-States|LessThan50K|
| 9| >50| HS-grad| Farming-fishing| Husband| Male|United-States|LessThan50K|
| 10| >50| Assoc-voc| Prof-specialty| Husband| Male|United-States|LessThan50K|
| 11| >50| HS-grad| Sales| Husband|Female|United-States|MoreThan50K|
| 12| null| Bachelors| Exec-managerial| Husband| null|United-States|MoreThan50K|
| 13|22-30| HS-grad| Craft-repair|Not-in-family| Male|United-States|LessThan50K|
| 14|31-50| Assoc-acdm| Exec-managerial| Unmarried| Male|United-States|LessThan50K|
| 15|22-30|Some-college| Sales| Own-child| Male|United-States|LessThan50K|
| 16| >50|Some-college| Exec-managerial| Unmarried|Female|United-States| null|
| 17|31-50| HS-grad| Adm-clerical|Not-in-family|Female|United-States|LessThan50K|
| 18|31-50| 10th|Handlers-cleaners| Husband| Male|United-States|LessThan50K|
| 19|31-50| HS-grad| Sales| Husband| Male| Iran|MoreThan50K|
+---+-----+------------+-----------------+-------------+------+-------------+-----------+
# Runs a job to compute repair updates for the seven NULL cells above in `dirty_df`
# A `repaired` column represents proposed updates to repiar them
>>> from repair.errors import NullErrorDetector
>>> repair_updates_df = delphi.repair \
... .setInput("adult") \
... .setRowId("tid") \
... .setErrorDetectors([NullErrorDetector()]) \
... .run()
>>> repair_updates_df.show()
+---+---------+-------------+-----------+
|tid|attribute|current_value| repaired|
+---+---------+-------------+-----------+
| 7| Sex| null| Female|
| 12| Age| null| 18-21|
| 12| Sex| null| Female|
| 3| Sex| null| Female|
| 5| Age| null| 18-21|
| 5| Income| null|MoreThan50K|
| 16| Income| null|MoreThan50K|
+---+---------+-------------+-----------+
# You need to set `True` to `repair_data` for getting repaired data directly
>>> clean_df = delphi.repair \
... .setInput("adult") \
... .setRowId("tid") \
... .setErrorDetectors([NullErrorDetector()]) \
... .run(repair_data=True)
>>> clean_df.show()
+---+-----+------------+-----------------+-------------+------+-------------+-----------+
|tid| Age| Education| Occupation| Relationship| Sex| Country| Income|
+---+-----+------------+-----------------+-------------+------+-------------+-----------+
| 0|31-50|Some-college| Craft-repair| Husband| Male|United-States|LessThan50K|
| 1| >50|Some-college| Exec-managerial| Own-child|Female|United-States|LessThan50K|
| 2|31-50| Bachelors| Sales| Husband| Male|United-States|LessThan50K|
| 3|22-30| HS-grad| Craft-repair| Own-child| Male|United-States|LessThan50K|
| 4|22-30| HS-grad| Farming-fishing| Husband|Female|United-States|LessThan50K|
| 5|31-50|Some-college| Craft-repair| Husband| Male|United-States|LessThan50K|
| 6|31-50| HS-grad| Prof-specialty|Not-in-family|Female|United-States|LessThan50K|
| 7|31-50| Prof-school| Prof-specialty| Husband| Male| India|MoreThan50K|
| 8|18-21|Some-college| Adm-clerical| Own-child|Female|United-States|LessThan50K|
| 9| >50| HS-grad| Farming-fishing| Husband| Male|United-States|LessThan50K|
| 10| >50| Assoc-voc| Prof-specialty| Husband| Male|United-States|LessThan50K|
| 11| >50| HS-grad| Sales| Husband|Female|United-States|MoreThan50K|
| 12|31-50| Bachelors| Exec-managerial| Husband| Male|United-States|MoreThan50K|
| 13|22-30| HS-grad| Craft-repair|Not-in-family| Male|United-States|LessThan50K|
| 14|31-50| Assoc-acdm| Exec-managerial| Unmarried| Male|United-States|LessThan50K|
| 15|22-30|Some-college| Sales| Own-child| Male|United-States|LessThan50K|
| 16| >50|Some-college| Exec-managerial| Unmarried|Female|United-States|LessThan50K|
| 17|31-50| HS-grad| Adm-clerical|Not-in-family|Female|United-States|LessThan50K|
| 18|31-50| 10th|Handlers-cleaners| Husband| Male|United-States|LessThan50K|
| 19|31-50| HS-grad| Sales| Husband| Male| Iran|MoreThan50K|
+---+-----+------------+-----------------+-------------+------+-------------+-----------+
# Or, you can merge the computed repair updates with the input table as follows
>>> repair_updates_df.createOrReplaceTempView("predicted")
>>> clean_df = delphi.misc.options({"repair_updates": "predicted", "table_name": "adult", "row_id": "tid"}).repair()
>>> clean_df.show()
<the same output above>
For more running examples, please check Python scripts in the resources/examples folder.
NOTE: There are many types of errors on dirty data [9], but our purpose is to repair the data
whose attribute already has correct values against their errors.
For instance, in the Sex
column in the adult
table above, our plugin can repair the three NULL cells
because it already has correct values, Female
or Male
, against the NULL cells.
To repair them, our plugin captures and exploits data dependencies between the Sex
column and the other ones.
For repairing the other types of data errors, existing data cleaning tools might be suitable;
a programming-by-examples technique is a good fit to fix format errors like 2021.8.23
-> 2021/8/23
and
Trifacta has a functionality,
named Transformation by Example,
to implement it. Few existing tools can handle the error cases in the adult
example above and,
therefore, our plugin is complementary to those other tools.
To detect error cells, you can use some of bult-in error detectors below:
Please check the example code for how to use these error detectors.
If you specify no error detector, DomainValues
s for each attribute and NullErrorDetector
are used by default.
# Setting `True` to `detect_errors_only` lets you get detected error cells only
>>> error_cells_df = delphi.repair \
... .setInput("adult") \
... .setRowId("tid") \
... .setErrorDetectors([NullErrorDetector()]) \
... .run(detect_errors_only=True)
>>> error_cells_df.show()
+---+---------+-------------+
|tid|attribute|current_value|
+---+---------+-------------+
| 12| Age| null|
| 5| Age| null|
| 12| Sex| null|
| 7| Sex| null|
| 3| Sex| null|
| 16| Income| null|
| 5| Income| null|
+---+---------+-------------+
# `DomainValue`s and `NullErrorDetector` are used by default
>>> error_cells_df = delphi.repair \
... .setInput("adult") \
... .setRowId("tid") \
... .run(detect_errors_only=True)
>>> error_cells_df.show()
+---+----------+--------------+
|tid| attribute| current_value|
+---+----------+--------------+
| 12| Age| null|
| 5| Age| null|
| 7| Sex| null|
| 12| Sex| null|
| 3| Sex| null|
| 5| Income| null|
| 16| Income| null|
| 4| Age| 22-30|
| 8| Age| 18-21|
| 3| Age| 22-30|
| 13| Age| 22-30|
| 15| Age| 22-30|
| 10| Education| Assoc-voc|
| 7| Education| Prof-school|
| 14| Education| Assoc-acdm|
| 12| Education| Bachelors|
| 2| Education| Bachelors|
| 18| Education| 10th|
| 0|Occupation| Craft-repair|
| 6|Occupation|Prof-specialty|
+---+----------+--------------+
only showing top 20 rows
Note that ConstraintErrorDetector
is the most powerful choice; it uses denial constraints [5]
that an input tabular data should follow. The constraints consist of the predicates that cannot hold true simultaneously.
# Constraints below mean that `Sex="Female"` and `Relationship="Husband"`
# (`Sex="Male"` and `Relationship="Wife"`) does not hold true simultaneously.
# Note that the syntax for denial constraints follows the HoloClean [7] one and
# it is a research-backed statistical inference engine to clean data.
$ cat ./testdata/adult_constraints.txt
t1&EQ(t1.Sex,"Female")&EQ(t1.Relationship,"Husband")
t1&EQ(t1.Sex,"Male")&EQ(t1.Relationship,"Wife")
# Use the constraints to detect errors and then repair them
>>> repair_updates_df = delphi.repair \
... .setInput("adult") \
... .setRowId("tid") \
... .setErrorDetectors([NullErrorDetector(), ConstraintErrorDetector(constraint_path="./testdata/adult_constraints.txt")]) \
... .run()
# Changes values from `Female` to `Male` in the `Sex` cells
# of the 4th and 11th rows.
>>> repair_updates_df.show()
+---+------------+-------------+-----------+
|tid| attribute|current_value| repaired|
+---+------------+-------------+-----------+
| 3| Sex| null| Male|
| 4|Relationship| Husband| Husband|
| 4| Sex| Female| Male|
| 5| Age| null| 31-50|
| 5| Income| null|LessThan50K|
| 7| Sex| null| Male|
| 11|Relationship| Husband| Husband|
| 11| Sex| Female| Male|
| 12| Age| null| 31-50|
| 12| Sex| null| Male|
| 16| Income| null|LessThan50K|
+---+------------+-------------+-----------+
# If the "adult" table has a functional dependency from "Age" to "Income",
# its dependency is represented as a following denial constraint:
>>> repair_updates_df = delphi.repair \
... .setInput("adult") \
... .setRowId("tid") \
... .setErrorDetectors([ConstraintErrorDetector(constraints="t1&t2&EQ(t1.Age,t2.Age)&IQ(t1.Income,t2.Income)")]) \
... .run()
# Or, you can use syntactic sugar instead
>>> repair_updates_df = delphi.repair \
... .setInput("adult") \
... .setRowId("tid") \
... .setErrorDetectors([ConstraintErrorDetector(constraints="Age->Income")]) \
... .run()
If you want to select some of repaired updates based on theier probabilities, you can set True
to
compute_repair_prob
for getting the probabilities from built statistical models.
# To get predicted probabilities, computes repair updates with `compute_repair_prob`=`True`
>>> repair_updates_df = delphi.repair.setInput("adult").setRowId("tid").run(compute_repair_prob=True)
>>> repair_updates_df.show()
+---+---------+-------------+-----------+------------------+
|tid|attribute|current_value| repaired| prob|
+---+---------+-------------+-----------+------------------+
| 3| Sex| null| Female|0.6664498420338913|
| 7| Sex| null| Female|0.7436767447201434|
| 16| Income| null|MoreThan50K|0.8721610530603738|
| 5| Age| null| 18-21|0.3018171710707878|
| 5| Income| null|MoreThan50K|0.8333912988626406|
| 12| Age| null| 18-21|0.3598905853884847|
| 12| Sex| null| Female|0.7436767447201434|
+---+---------+-------------+-----------+------------------+
# Applies the repair udpates whose probabilities are greater than 0.70
>>> repair_updates_df.where("prob > 0.70").createOrReplaceTempView("predicted")
>>> clean_df = delphi.misc.options({"repair_updates": "predicted", "table_name": "adult", "row_id": "tid"}).repair()
>>> clean_df.show()
<output with the four cells repaired>
You can run a repair job (main.py) on your Spark cluster as following:
$ echo $SPARK_HOME
/tmp/spark-3.2.0-bin-hadoop3.2
$ ./bin/spark-submit ./python/main.py --input adult --output repaired --row-id tid
Predicted repair values are saved as 'repaired'
$ $SPARK_HOME/bin/spark-shell
scala> spark.table("repaired").show()
+---+---------+-------------+-----------+
|tid|attribute|current_value| repaired|
+---+---------+-------------+-----------+
| 7| Sex| null| Female|
| 12| Age| null| 18-21|
| 12| Sex| null| Female|
| 3| Sex| null| Female|
| 5| Age| null| 18-21|
| 5| Income| null|MoreThan50K|
| 16| Income| null|MoreThan50K|
+---+---------+-------------+-----------+
delphi.repair
// Basic Parameters
.setDbName(str) // database name (default: '')
.setInput(str) // table name or `DataFrame`
.setRowId(str) // unique column name in table
.setTargets(list) // target attribute list to repair
// Parameters for Error Detection
.setErrorCells(str) // user-specified error cells
.setErrorDetectors(list) // list of error detector implementations (`NullErrorDetector`, `DomainValues`, `RegExErrorDetector`, `ConstraintErrorDetector`, and `GaussianOutlierErrorDetector`)
.setDiscreteThreshold(int) // max domain size of discrete values (default: 80)
// Parameters for Repair Model Training
.setRepairByRules(bool) // whether to enable rule-based repair techniques, e.g., using functional dependencies and merging nearest values (default: False)
.setParallelStatTrainingEnabled(bool) // whether to run multiples tasks to build stat repair models (default: False)
.setTrainingDataRebalancingEnabled(bool) // whether to rebalance class labels in training data (default: False)
// Parameters for Repairing
.setRepairDelta(int) // max number of applied repairs
// Running Mode Parameters
.run(
detect_errors_only=bool, // whether to return detected error cells (default: False)
compute_repair_candidate_prob=bool, // whether to return probabiity mass function of candidate repairs (default: False)
compute_repair_prob=bool, // whether to return probabiity of predicted repairs
repair_data=bool // whether to return repaired data
)
If you hit some bugs and have requests, please leave some comments on Issues or Twitter (@maropu).