We've seen a single instance in production where a couple of data files were committed to a table twice in the same append operation.
While this appears to be a relatively isolated incident, it's obviously not a positive sign wrt exactly-once guarantees.
As a result, this PR introduces a short-term fix to avoid committing the same file twice in the same operation AND adds some logging to help detect this quicker and with more contextual information.
How?
Deduplicates data and delete files received in a batch of messages from Kafka before committing to the table.
Important to note that this does NOT eliminate data/delete files duplicated across batches of messages read from Kafka. We're strictly concerned about data and delete files duplication in a given batch of messages in this PR.
Adds logging to help identify generally where the duplicates may be stemming from. Duplication of data files in a batch of messages will manifest generally in one of three ways:
same file appears in 2 equivalent envelopes e.g. if the Coordinator read the same message twice from Kafka
In this case, you should see a log message similar to Deduplicated 2 data files with the same path=data.parquet for table=db.tbl during commit-id=cf602430-0f4d-41d8-a3e9-171848d89832 from the following events=[2x(SimpleEnvelope{...})]
same file appears in 2 different envelopes e.g. if a Worker sent the same message twice to Kafka
In this case, you should see a log message similar to Deduplicated 2 data files with the same path=data.parquet for table=db.tbl during commit-id=cf602430-0f4d-41d8-a3e9-171848d89832 from the following events=[1x(SimpleEnvelope{...}), 1x(SimpleEnvelope{...})]
same file appears in a single envelope twice e.g. if a Worker included the same file twice in a single message sent to Kafka. In this case, you should see a log message similar to Deduplicated 2 data files with the same path=data.parquet in the same event=SimpleEnvelope{...} for table=db.tbl during commit-id=cf602430-0f4d-41d8-a3e9-171848d89832
What?
How?
Deduplicated 2 data files with the same path=data.parquet for table=db.tbl during commit-id=cf602430-0f4d-41d8-a3e9-171848d89832 from the following events=[2x(SimpleEnvelope{...})]
Deduplicated 2 data files with the same path=data.parquet for table=db.tbl during commit-id=cf602430-0f4d-41d8-a3e9-171848d89832 from the following events=[1x(SimpleEnvelope{...}), 1x(SimpleEnvelope{...})]
Deduplicated 2 data files with the same path=data.parquet in the same event=SimpleEnvelope{...} for table=db.tbl during commit-id=cf602430-0f4d-41d8-a3e9-171848d89832