Open lisad opened 6 months ago
@YuliaS there are a few different ways to do this. Imagine we have a phase as follows and it adds a column in step2
@row_step
def step2(row):
row['my_column'] = 'lisa'
return row
Phase(name="transformer", steps=[ step1, step2, step3])
Should we save all the dropped rows in a dropped-rows file AS they are dropped? The ones dropped in step 1 won't have the new column 'my_column'. The ones dropped in step3 will have the new column 'my_column'.
Or, should we save the list of dropped rows as they appeared BEFORE the phase? then none of the rows dropped in the phase will have the new column 'my_column'.
It is also possible of course to save the list of dropped rows as they appeared before the pipeline even began, but I don't think that fits as nicely with the way checkpoints and dropped-row messages are organized by phase.
Another question: should we save the list of dropped rows in the same format that the pipeline normally handles data (currently CSV)? The alternative would be to save a text file that COULD be parsed but it might be a little harder. Or we could always save the set of dropped rows in a JSON file which allows them all to have different fields even as fields are added and removed through the pipeline.
I'm willing to do it differently, but if you're indifferent, my preference is:
I think that lines up nicely with checkpoints - at the end of every phase, the checkpoint has all the rows kept; at the end of the next phase there is a file of rows dropped with the same columns that appeared at the end of the first phase.
we could probably even find out all the dropped rows during a batch step by comparing the batch before and after