tabular-io / iceberg-kafka-connect

Apache License 2.0
202 stars 46 forks source link

DRAFT: Dead letter implementations #244

Closed fqtab closed 4 months ago

fqtab commented 5 months ago

It might be a little hard to see WriteExceptionHandler is all we really need to add to fit all the use cases. This PR should demonstrate how you should be able to fit the most common usecases within the WriteExceptionHandler framework.

I want to write out the original byte array to a dead-letter-table

In theory (i.e. NOT definitively proven yet) ...

For the usecase of writing out the original Kafka key/value byte arrays, we have to do a little bit of work:

Importantly, what I'm trying to avoid is having special record structure that the connector code needs to understand in order to do the right thing. The ideal situation for me is the connector is oblivious to "good" and "dead-letter-records-from-Converter/SMT-failures."

I want to write out the original JSON to the dead-letter-table

Same approach as above

I want to capture deserialization/SMT failures somehere!

Two options:

I want to capture null records somewhere!

Two options:

I don't want to write bad records to Iceberg, I want to write to Kafka!

See KafkaDLQ for a sample implementation.

fqtab commented 5 months ago

@tabmatfournier the other PR just FYI

tabmatfournier commented 5 months ago

Importantly, what I'm trying to avoid is having special record structure that the connector code needs to understand in order to do the right thing. The ideal situation for me is the connector is oblivious to "good" and "dead-letter-records-from-Converter/SMT-failures."

TBH you are still doing this, you're just changing the shape of what the special record is. The connector has to know about dead-letter-records-from-converter during failure in order to dig out the original bytes.

tabmatfournier commented 5 months ago
  • I'm thinking in the SinkRecord's key or the SinkRecord's headers or SinkRecords schemas parameters

Not this simple.

Headers are probably the best bet but least nice to work with. This is why I went for the custom struct approach. Either way you are writing some custom code that knows how to dig this out of the record, and putting that into the connector. In my approach this was determined by a config option --not using dead letter/error transform? nothing goes looking for this (no runtime cost).

fqtab commented 5 months ago

The connector has to know about dead-letter-records-from-converter during failure in order to dig out the original bytes.

The connector code doesn't need to know. The WriteExceptionHandler code will need to know.

SinkRecord may not have a schema (it's possible it's a Map for instance, in which case the schema is null). Even if it does have a schema, the parameters are a Map<String, String> so you have have to base64 encode your bytes in order to put them in (and decode them out later) if you wanted to do this.

Let's definitely not use schema parameters then lol (that's why I had it last in the list)

You would have to overwrite the key and assume no one is using it downstream if you want to use the key.

Yea absolutely, but I thought the idea with the ExceptionHandlingTransform was it would be the one and only SMT involved (it can run user supplied SMTs internally). So there would be no downstream (other than SinkTask.put obviously)?

Even with your approach, if another SMT is involved downstream of the ExceptionHandlingTransform, it could break things, no?

Headers are probably the best bet but least nice to work with.

I would prefer key for that reason personally.

In my approach this was determined by a config option --not using dead letter/error transform? nothing goes looking for this (no runtime cost).

This approach should also have no runtime cost if you're not using a WriteExceptionHandler. (The code will use the MultiTableWriter instead of wrapping that in an ExceptionHandlingTableWriter if no WriteExceptionHandler is supplied).

tabmatfournier commented 5 months ago

The connector has to know about dead-letter-records-from-converter during failure in order to dig out the original bytes.

The connector code doesn't need to know. The WriteExceptionHandler code will need to know.

SinkRecord may not have a schema (it's possible it's a Map for instance, in which case the schema is null). Even if it does have a schema, the parameters are a Map<String, String> so you have have to base64 encode your bytes in order to put them in (and decode them out later) if you wanted to do this.

Let's definitely not use schema parameters then lol (that's why I had it last in the list)

You would have to overwrite the key and assume no one is using it downstream if you want to use the key.

Yea absolutely, but I thought the idea with the ExceptionHandlingTransform was it would be the one and only SMT involved (it can run user supplied SMTs internally). So there would be no downstream (other than SinkTask.put obviously)?

Even with your approach, if another SMT is involved downstream of the ExceptionHandlingTransform, it could break things, no?

Headers are probably the best bet but least nice to work with.

I would prefer key for that reason personally.

In my approach this was determined by a config option --not using dead letter/error transform? nothing goes looking for this (no runtime cost).

This approach should also have no runtime cost if you're not using a WriteExceptionHandler. (The code will use the MultiTableWriter instead of wrapping that in an ExceptionHandlingTableWriter if no WriteExceptionHandler is supplied).

I'm hesitant to override the key. We are now providing a lot of customization downstream, people may be digging information out of the key. I don't think we can override/replace what is there.