MI-DPLA / combine

Combine /kämˌbīn/ - Metadata Aggregator Platform
MIT License
26 stars 11 forks source link

Determining `transformed` column with fingerprinting in Transform can return more Records than input #229

Closed ghukill closed 6 years ago

ghukill commented 6 years ago

Given a Job with some errors, the following can return a dataframe witih more rows than were used for input:

records_trans = records_trans.alias("records_trans").join(input_records.alias("input_records"), input_records.fingerprint == records_trans.fingerprint, 'left').select(*['records_trans.%s' % c for c in records_trans.columns if c not in ['transformed']], pyspark_sql_functions.when(pyspark_sql_functions.isnull(pyspark_sql_functions.col('input_records.fingerprint')), pyspark_sql_functions.lit(True)).otherwise(pyspark_sql_functions.lit(False)).alias('transformed'))
ghukill commented 6 years ago

Confirmed: deleting Records from input Job with errors resulted in parity between input and output record count.

ghukill commented 6 years ago

Fixed: rewrote fingerprint comparison by joining on combine_id, then comparing on fingerprint column and writing transformed with the result:

records_trans = records_trans.alias("records_trans").join(input_records.alias("input_records"), input_records.combine_id == records_trans.combine_id, 'left').select(*['records_trans.%s' % c for c in records_trans.columns if c not in ['transformed']], pyspark_sql_functions.when(records_trans.fingerprint != input_records.fingerprint, pyspark_sql_functions.lit(True)).otherwise(pyspark_sql_functions.lit(False)).alias('transformed'))