databricks / iceberg-kafka-connect

Apache License 2.0
220 stars 49 forks source link

Dead letter table #233

Open tabmatfournier opened 7 months ago

tabmatfournier commented 7 months ago

Implements an (optional) dead letter table where failed messages go to a dedicated Iceberg table. This functionality aims to improve and simplify the error handling provided by Kafka Connect. Kafka Connect's dead letter queue only handles deserialization and SMT failures and writes to another Kafka topic where it requires additional engineering effort to inspect and recover messages. With this PR, errors are written to a dedicated Iceberg Table where messages can be inspected and recovered using tools users may be more comfortable with (Spark, etc). The table row contains everything required to convert a row back into a Kafka ProducerRecord; however, the functionality to do this is engine specific and not provided in this PR.

Location of Failure Kafka Connect DLQ This PR
Deserialization/Converter Yes Yes
SMT Yes Yes
Table creation / schema issues No Yes
Iceberg record conversion No Yes
Malformed records (e.g. missing table route information) No Yes
Schema evolution issues No Yes

This PR aims to minimize stream processing exceptions from imperfect producers by writing to the Dead Letter Table rather than failing constantly and causing rebalances inside of the Kafka Cluster which can negatively affect other jobs.

It is comprised of two components:

  1. An ErrorTransform SMT that wraps the Deserializer and zero or more SMTs
  2. Changes to Worker.java / IcebergWriterFactory.java to catch issues around table creation, schema parsing, and Iceberg record conversion

Not all errors result in conversion to records for the Dead Letter Table. For example, network/connection errors thrown during table operations w/ the underlying Catalog will still fail the connector (as a form of retry when the connector is restarted).

This is opt in. Users can decide not to use this, use the Kafka Connect DLQ, ignore errors, or fail on all errors just like previous functionality.

Error Transform

Kafka Connects value, key, and header converters must be ByteArrayConverters. The desired converters (AvroConverter, JsonConverter, etc.) are supplied to the ErrorTransform SMT along with any other SMTs.

Changes to Worker.java / IcebergWriterFactory.java

When configured to use the DeadLetterTable the connector expects to messages to be in the shape of the data from the ErrorTransform SMT. Failed records from the SMT will be written to the Dead Letter Table. Successfully transformed SinkRecords will attempt the normal connector flow. If it fails for non-transient reasons, the original key, value, and header bytes in the specially transformed record are used to construct a SinkRecord for the Dead Letter Table with the required Kafka and error metadata before being written via normal table fanout.

Limitations

This is the first PR. Additional work is required for some advanced Converters such as the AvroConverter, where finely grained exception handling needs to be implemented to differentiate between real Avro errors (e.g. the message is not valid Avro bytes or the message does not have an entry in the Schema registry) and network related Avro exceptions (e.g. contacting the Schema registry times out). This is planned in future PRs. In the interim, an ErrorHandler class is exposed as a config option for both converters and SMTs and can be extended by users to implement the required error handling (and rethrowing) for advanced Converters / custom Converters / etc.

tabmatfournier commented 6 months ago

Substantial re-work. Still need to add tests for ErrorHandlingRecordRouter but the code is more or less there now.

tabmatfournier commented 6 months ago

I have more comments but I'm struggling to get past some of the limitations of the current approach like the fixed schema. I have a different take on the problem that I would strongly like for us to consider:

  • Exceptions happening within SinkTask.put would be captured by a user configured WriteExceptionHandler and handled there as the user wants (write to a dead-letter-table, kafka, log it, whatever the user wants)
  • Converter/SMT exceptions (i.e. things before SinkTask.put), users should configure the connector in iceberg.tables.dynamic-enabled with a iceberg.tables.route-field and write an exception-handling-SMT that points to the dead-letter-table.

See #243 #244 for draft implementations of the idea

Largely been addressed in the latest update, appreciate the feedback and discussions we've had.

tabmatfournier commented 6 months ago

Need to add a config for third mode. Look at IntegrationMultiTableTest where both Iceberg.tables and a regex is set --I can't differentiate this case from the static routing (with dynamic fallback for dead letter routing). HRM.

ron-trail commented 6 months ago

Super useful functionality, thanks!

ron-trail commented 5 months ago

Hi, any ETA for this feature?

okayhooni commented 5 months ago

Thank you for implementing this valuable feature..!

It will solve the many issues like below

preetpuri commented 1 month ago

Hi, do you have an ETA for when this PR will be merged?

Daesgar commented 1 month ago

This would be much appreciated to have as we are dealing with it and skipping the problematic offset leads to loss of data. 😞