tabular-io / iceberg-kafka-connect

Apache License 2.0
203 stars 46 forks source link

Kafka Connect DLQ options on this sink Connector were NOT working on sink phase.. #152

Open okayhooni opened 11 months ago

okayhooni commented 11 months ago

I tested DLQ options on this sink connector with deliberately wrong schema table.

But, it didn't work as I expected..

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.connect.errors.DataException: An error occurred converting record, topic: order.prod.streaming.order-created.avro, partition, 2, offset: 112705806
    at io.tabular.iceberg.connect.data.IcebergWriter.write(IcebergWriter.java:73)
    at io.tabular.iceberg.connect.channel.Worker.lambda$routeRecordStatically$5(Worker.java:197)
    at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4204)
    at io.tabular.iceberg.connect.channel.Worker.routeRecordStatically(Worker.java:195)
    at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:184)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:171)
    at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:150)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)
    ... 11 more
Caused by: java.lang.IllegalArgumentException: Cannot convert to struct: java.lang.String
    at io.tabular.iceberg.connect.data.RecordConverter.convertStructValue(RecordConverter.java:159)
    at io.tabular.iceberg.connect.data.RecordConverter.convertValue(RecordConverter.java:115)
    at io.tabular.iceberg.connect.data.RecordConverter.lambda$convertToStruct$1(RecordConverter.java:236)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1092)
    at io.tabular.iceberg.connect.data.RecordConverter.convertToStruct(RecordConverter.java:203)
    at io.tabular.iceberg.connect.data.RecordConverter.convertStructValue(RecordConverter.java:157)
    at io.tabular.iceberg.connect.data.RecordConverter.convert(RecordConverter.java:98)
    at io.tabular.iceberg.connect.data.IcebergWriter.convertToRow(IcebergWriter.java:86)
    at io.tabular.iceberg.connect.data.IcebergWriter.write(IcebergWriter.java:62)
    ... 19 more

Kafka connect DLQ cannot handle PUT lifecycle stage as below..?

image

REF: https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues

If the error on sink phase is not handled by Kafka connect native DLQ feature, then how about adding similar DLQ option on this connector level (which can handle the issue records raising error on the PUT lifecycle)? something like.. iceberg.dlq.topic..!

bryanck commented 11 months ago

You are correct, a conversion failure will throw an error rather than route to the DLQ. Other sinks have an option to skip "bad" records, which is something we could add. Also, you can manually skip the record by setting the consumer group offset.

okayhooni commented 11 months ago

Thank you for quick answer!

bryanck commented 11 months ago

You can leave this open if you want and we can consider options to handle this best.

okayhooni commented 11 months ago

Thanks..! I do reopen this issue..!

I hope this new feature can re-produce those messages to separate DLQ topic(as like this sink produces to control topic), to re-consume those messages in its own DLQ topic after re-configuring this sink connector or altering target table manually..!

skushwaha-mdsol commented 8 months ago

hi @bryanck We are also trying to solve this conversion failure issue in the connector, so I was wondering is there way around to skip the records which are throwing the conversion failure error.

org.apache.kafka.connect.errors.DataException: An error occurred converting record, topic: datasets-test, partition, 0, offset: 10

-------------------connector-configurations---------------------

connector.class=io.tabular.iceberg.connect.IcebergSinkConnector iceberg.tables.cdc-field=_cdc_op errors.log.include.messages=false tasks.max=2 key.converter.region=us-east-1 iceberg.catalog.client.region=us-east-1 iceberg.tables.dynamic-enabled=true errors.deadletterqueue.context.headers.enable=true iceberg.control.commit.timeout-ms=60000 value.converter=org.apache.kafka.connect.json.JsonConverter errors.log.enable=true iceberg.control.group-id=cg-control-iceberg-kafka-new-config iceberg.tables.route-field=iceberg_table errors.retry.timeout=600000 iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO topics=datasets-test errors.retry.delay.max.ms=30000 iceberg.catalog=AwsDataCatalog value.converter.region=us-east-1 errors.deadletterqueue.topic.name=datasets-error value.converter.schemas.enable=false errors.tolerance=all iceberg.catalog.warehouse=s3://data-os-sandbox iceberg.tables.default-id-columns=record_id iceberg.control.topic=control-iceberg-test

bryanck commented 8 months ago

The only way to do it now is to manually set the partition offset ahead of the bad record(s). I'll look into adding this as a config option in a day or two.

bryanck commented 8 months ago

I gave this some thought and I am reluctant to add an option to simply skip a bad record, as it could lead to unexpected data loss. We should have the ability to write the record to a DLQ or table. This was recently opened, which is related.

mddunn commented 7 months ago

Opened this issue as well for the same functionality: https://github.com/tabular-io/iceberg-kafka-connect/issues/191