databricks / iceberg-kafka-connect

Apache License 2.0
220 stars 49 forks source link

Add WriteExceptionHandler #243

Closed fqtab closed 5 months ago

fqtab commented 7 months ago

Adds a configuration option to specify a WriteExceptionHandler to use to handle any exceptions while writing SinkRecords to files.

I haven't tested any of this yet :D but in theory:

Pros

  1. Doesn't require a specific schema for dead-letter tables
    • The nice thing here is users can have any schema for their "dead-letter" table
    • If you only want to write out topic, partition, and offset, you can
    • If you want to write out keys and values as byte[], you can
    • If you want to write out keys and values as JSON, you can
  2. Doesn't limit you to a single dead-letter-table per connector
    • Users can have as many or as few dead letter tables as they want, just write an appropriate handler to do it
    • E.g. if a user wants a dead-letter-table per topic, users can just write a WriteExceptionHandler that sends bads record from different topics to different tables.
  3. The connector code remains completely oblivious to Converter/SMT failures
  4. Users have fine-grained control over which exceptions to handle (or not)
  5. Similarly, we (maintainers) don't have to worry about adding exception handling code to the connector
    • I'm particularly worried about REST catalogs which only really have a REST spec. There is no guarantee an exception thrown by one REST Catalog is non-transient versus another.
  6. Users aren't forced to write bad records to an iceberg table
    • Although I expect this feature to be used mostly for writing bad records to an iceberg table, it's not compulsory
    • Some users might want to write to Kafka and this is easy to do in Kafka Connect via the SinkTaskContext.errantRecordReporter.report API
    • Users are also welcome to write to other systems (e.g. Redis, logs etc.) via a custom WriteExceptionHandler implementation
  7. Relatively straight-forward change to the codebase
  8. Minor: Doesn't require a specific record structure (isFailed parameter, PAYLOAD_KEY, etc.) for all records in "dead-letter-table mode" (unless the user needs it for their WriteExceptionHandler implementation)
    1. This is minor because I imagine that at least 50% of use cases will want this but I still think we can do better than isFailed parameter and PAYLOAD_KEY and things.

Cons

  1. If users want to write Converter/SMT failures to dead-letter-table, they need to use the connector in dynamic
    • Dynamic mode as it's implemented today only allows you to write to one table at a time i.e. no table fanout.
    • However, we should be able to alleviate this issue by allowing users to provide a comma-separated list of tables in the route field (im not sure why this wasn't done in the first place so we should double check here). This should be a separate PR though.
    • Even if we can't alleviate this issue, I'm happy to accept this as a compromise (since the semantics are gonna be a little weird with multiple tables involved) but that's debatable.
    • @ajreid21 had another good idea on addressing the dynamic-mode constraint here. We could potentially change the connector a little bit so that in dynamic mode, it defaults to looking at the route-field but if the route field doesn't exist, it could fall back to some statically defined values (just like in static mode).
  2. Users have to write a correct WriteExceptionHandler implementation
    • We can mitigate this by writing a sample WriteExceptionHanlder implementation, similar to how we provide sample SMT implementations already
  3. Users have to make sure their WriteExceptionHandler implementation is available on the classpath
    • Users already have to make sure that Converter/SMT implementations are available on the classpath so this shouldn't be anything new to most users
    • If we really wanted to, we could include some WriteExceptionHandler implementations in the kafka-connect-iceberg project (but we should get some stable implementations first)

How to

It might be a little hard to see how people could use this feature for specific use cases so I've prepared a separate draft PR to show this: https://github.com/tabular-io/iceberg-kafka-connect/pull/244

fqtab commented 7 months ago

@tabmatfournier this is just a draft to illustrate the idea but feel free to ask any questions here. I don't intend to merge this; I assume you'll pull in any changes back into your PR if we go with this approach.

tabmatfournier commented 7 months ago

Thanks. I'll take a look

tabmatfournier commented 7 months ago

Not sure this is much of an improvement TBH. Very broad net.

The real problem is you have a bunch of errors in the writer .write method which happen long before you write a record to iceberg:

Actually writing (to an appender) is not the issue. But the above logic is split between the WriterFactory and the writer itself.

Might be worth pulling that out into distinct steps (catalogAPI does this for the first two cases, may be worth introducing a RecordConverter outside of the writer). It might make the wrapping you want more clear.

tabmatfournier commented 7 months ago

< E.g. if a user wants a dead-letter-table per topic, users can just write a WriteExceptionHandler that sends bads record from different topics to different tables.

This is nice but you also have to handle cases where you don't know the table it's going to (because the table name itself is invalid and the catalog throws) when trying to create it. It's not that simple.

tabmatfournier commented 7 months ago

7. Relatively straight-forward change to the codebase disagree, and there is functionality missing that once added ... will make this more complicated.

Current plan:

  1. refactor table creation/schema stuff more cleanly (this is mostly CatalogAPI in my PR)
  2. refactor record conversion into classes
  3. Introduce an interface for exception handling (per this PR)
  4. Look into modifying the special record shape --Mine creates a special struct, this suggests throwing it all onto the keys but the fact remains it's still a special record that needs information to be in a specific space. This means the pluggable error handling in the ErrorTransform needs to still produce a special record and since that is pluggable, you have to be careful, as the user needs to know in the connector how to dig out that record also in their pluggable interface. At some point you have provided nothing but a footgun API --imho, we have to make some opinions here not just a bunch of interfaces that the user needs to "know" to glue together (even though it's not obvious in code, due to constraints forced on the system by Kafka Connect). If this can't be done cleanly, we should talk again to see what can be done. I don't think it's as easy as what has been presented in your two draft PRs.
tabmatfournier commented 7 months ago
  • Converter/SMT exceptions (i.e. things before SinkTask.put), users should configure the connector in iceberg.tables.dynamic-enabled with a iceberg.tables.route-field and write an exception-handling-SMT that points to the the dead-letter-table.

I don't want to gate keep this feature only for users who use dynamic-enabled. There is nothing here restricting other users from using this other than "simplifying code" (of course it's simplifying code --we are ignoring valid configurations).

I believe this to be a deal breaker.

tabmatfournier commented 7 months ago

Similarly, we (maintainers) don't have to worry about adding exception handling code to the connector

  • I'm particularly worried about REST catalogs which only really have a REST spec. There is no guarantee an exception thrown by one REST Catalog is non-transient versus another.

If that is the case, why are we providing any implementations at all. IMHO we can't get away with just providing the interface for users. We have to provide some amount of default implementation.

tabmatfournier commented 7 months ago
  • Users can have as many or as few dead letter tables as they want, just write an appropriate handler to do it

Is this something we want?

Remember: you will still need a default dead letter table because there are many cases where you don't know where to send the record to (error transform failures are a good example of this), so you are left to config value/fn of connector name. I would argue topic name is a poor choice but you could do that if you wanted. It is a poor choice because the user may be running several connectors for the same topic and using different SMTs.

I'm not convinced you can just "write an appropriate handler to do it" because you often won't have any context for where to route to.

fqtab commented 7 months ago

Not sure this is much of an improvement TBH. Very broad net.

The real problem is you have a bunch of errors in the writer .write method which happen long before you write a record to iceberg:

  • a laundry list of issues creating a table (table names, partition specs, etc) including values from hardcoded configs
  • partition evolution cases
  • iceberg record conversion issues

Actually writing (to an appender) is not the issue. But the above logic is split between the WriterFactory and the writer itself.

Might be worth pulling that out into distinct steps (catalogAPI does this for the first two cases, may be worth introducing a RecordConverter outside of the writer). It might make the wrapping you want more clear.

Oh my bad, i threw this together pretty quickly. I thought most of the errors would only happen when we write SinkRecords to files but if there are errors we think it would be worth catching higher up (e.g. when creating a Writer), then we can certainly move the try {} catch () to a higher level, not opposed to that.

fqtab commented 7 months ago
  • Converter/SMT exceptions (i.e. things before SinkTask.put), users should configure the connector in iceberg.tables.dynamic-enabled with a iceberg.tables.route-field and write an exception-handling-SMT that points to the the dead-letter-table.

I don't want to gate keep this feature only for users who use dynamic-enabled. There is nothing here restricting other users from using this other than "simplifying code" (of course it's simplifying code --we are ignoring valid configurations).

I believe this to be a deal breaker.

No problems, there are things we can do to mitigate this. Alex had a pretty good idea about allowing dynamic mode to fallback to static mapping if the route field is missing/null. I see that as a valuable feature for usecases outside of handling converter/SMT failures so I would be in favour of supporting that change to the connector code. That should address this concern entirely unless I'm missing something else.

tabmatfournier commented 7 months ago

there are errors we think it would be worth catching higher up (e.g. when creating a Writer), then we can certainly move the try {} catch () to a higher level, not opposed to that.

It's not that easy. Unfortunately where you need to catch is a shotgun (worker, writer, writer factory, schemautils, etc.)

tabmatfournier commented 7 months ago
  • Converter/SMT exceptions (i.e. things before SinkTask.put), users should configure the connector in iceberg.tables.dynamic-enabled with a iceberg.tables.route-field and write an exception-handling-SMT that points to the the dead-letter-table.

I don't want to gate keep this feature only for users who use dynamic-enabled. There is nothing here restricting other users from using this other than "simplifying code" (of course it's simplifying code --we are ignoring valid configurations). I believe this to be a deal breaker.

No problems, there are things we can do to mitigate this. Alex had a pretty good idea about allowing dynamic mode to fallback to static mapping if the route field is missing/null. I see that as a valuable feature for usecases outside of handling converter/SMT failures so I would be in favour of supporting that change to the connector code. That should address this concern entirely unless I'm missing something else.

Not opposed but I can show you how this is tricky/challenging in a screenshare.

fqtab commented 7 months ago

If that is the case, why are we providing any implementations at all. IMHO we can't get away with just providing the interface for users. We have to provide some amount of default implementation.

Absolutely. That's why your PR is offering an ExceptionHandlingTransform. The only additional thing this PR is requiring is also an appropriate WriteExceptionHandler. We need to provide a good default implementation of that. And we can do that correctly for Tabular's REST Catalog with a TabularWriteExceptionHandler for example (+ Hive + anything else, up to us). For other REST Catalogs? We can't offer anything of the sort without seeing their code.

fqtab commented 7 months ago

there are errors we think it would be worth catching higher up (e.g. when creating a Writer), then we can certainly move the try {} catch () to a higher level, not opposed to that.

It's not that easy. Unfortunately where you need to catch is a shotgun (worker, writer, writer factory, schemautils, etc.)

~Sure, let's pair later, sounds like I might have dodged a lot of the complexity by focusing on only the write path :D~

Scratch that, this should cover all errors coming from WriterFactory, IcebergWriter, and SchemaUtills as well? https://github.com/tabular-io/iceberg-kafka-connect/blob/925dbcb74627a9c53370db55ec8967677c5605a2/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/MultiTableWriter.java#L49