databricks / iceberg-kafka-connect

Apache License 2.0
213 stars 47 forks source link

Which version of kafka does this connector supports? #273

Closed arshahmad1 closed 2 months ago

arshahmad1 commented 3 months ago

hi guys, I'm using this connector to sink data from kafka to s3 as iceberg tables but I'm getting the following exception:

java.lang.NoSuchMethodError: 'org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions.requireStable(boolean)'

What version of kafka is supported by this connector? my aws managed kafka version is 3.5.1

Also I tried downgrading my kafka to 2.8.2 and I'm still getting the same error

arshahmad1 commented 3 months ago

hey @tabmatfournier, Sorry to ping you directly. Can you please help me here.

antcalvente commented 3 months ago

I'm facing the same issue on my side as well (using MSK kafka version 3.7.1), looks like it's an issue with transitive dependencies maybe? (as I see kafka-ver declared as 3.5.1 and requireStable method was introduced from version 3.3.* https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.html#requireStable(boolean)

I had to downgrade the connector to old versions to avoid this issue, but I'm facing other problems like constants re-joining to group and Commit timeouts

I tried to force with gradle resolutionStrategies and constraints but even though it shows the correct version, on runtime it goes to one prior to 3.3.* from what it seems

Full stacktrace:

[Worker-0e0622204e5dac11a] [2024-07-11 14:20:35,357] ERROR [SinkConnector\|task-0] WorkerSinkTask{id=SinkConnector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191)
--
[Worker-0e0622204e5dac11a] java.lang.NoSuchMethodError: 'org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions.requireStable(boolean)'
[Worker-0e0622204e5dac11a]  at io.tabular.iceberg.connect.channel.CommitterImpl.fetchStableConsumerOffsets(CommitterImpl.java:116)
[Worker-0e0622204e5dac11a]  at io.tabular.iceberg.connect.channel.CommitterImpl.<init>(CommitterImpl.java:97)
[Worker-0e0622204e5dac11a]  at io.tabular.iceberg.connect.channel.CommitterImpl.<init>(CommitterImpl.java:70)
[Worker-0e0622204e5dac11a]  at io.tabular.iceberg.connect.channel.TaskImpl.<init>(TaskImpl.java:37)
[Worker-0e0622204e5dac11a]  at io.tabular.iceberg.connect.IcebergSinkTask.open(IcebergSinkTask.java:56)
[Worker-0e0622204e5dac11a]  at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:640)
[Worker-0e0622204e5dac11a]  at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:705)
[Worker-0e0622204e5dac11a]  at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293)
[Worker-0e0622204e5dac11a]  at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365)
[Worker-0e0622204e5dac11a]  at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508)
[Worker-0e0622204e5dac11a]  at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1257)
[Worker-0e0622204e5dac11a]  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1226)
[Worker-0e0622204e5dac11a]  at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:457)
[Worker-0e0622204e5dac11a]  at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:324)
[Worker-0e0622204e5dac11a]  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
[Worker-0e0622204e5dac11a]  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
[Worker-0e0622204e5dac11a]  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[Worker-0e0622204e5dac11a]  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[Worker-0e0622204e5dac11a]  at java.base/java.lang.Thread.run(Thread.java:829)
ron-damon commented 3 months ago

Hi guys,

IIUC the problem is MSK Connect's version¹ (2.7.1), not MSK per se. We're using the sink conector (v0.6.19) against MSK (Kafka v3.5.1) without any problem, but we had to move from MSK Connect to a non-manged kafka connect.

¹ From Amazon Managed Streaming for Apache Kafka's Developer Guide:

What is MSK Connect? MSK Connect is a feature of Amazon MSK that makes it easy for developers to stream data to and from their Apache Kafka clusters. MSK Connect uses Kafka Connect 2.7.1, an open-source framework for connecting Apache Kafka clusters with external systems such as databases, search indexes, and file systems. ...

antcalvente commented 3 months ago

Hi @ron-damon, Agree with you, the MSK connector "shouldn't" be related to this issue. I'm currently using my own version of the connector with MSK connectors and it works wonderfully but I had to do 3 things.

First two are simple and I'll open a PR for them asap (edit: https://github.com/tabular-io/iceberg-kafka-connect/pull/275):

This solves the disconnection issues that I was having with remote kafka servers or MSK, now it connects without any issue (this didn't happen on a local kafka for obvious reasons as the connection between connectors and kafka server is immediate). Regarding this, I haven't been able to see any downside of increasing the timeout apart from being a completely random timeout that works for MSK (as it's set already anyway on the code), but still, it's a magic number that works for my case...

And the last one is a trick to make the connector compatible with kafka-clients and solve the issue I posted before for kafka-ver < 3.3:

Sharu95 commented 3 months ago

Stumbled upon this experiencing the same issue on the MSK broker I'm working with. MSK Connect version 2.7.1, and I'm on MSK kafka version 2.6.2, unfortunately not possible to update myself.

The requireStable was introduced after 3.3 it seems like, yeah, so I'm getting the same issue as @arshahmad1, no such method found 😅

Any backward compatible implementation would be amazing! @antcalvente, if you remove requireStable, how would offset stability be handled? 🤔

antcalvente commented 3 months ago

Hi @Sharu95 ! So far I have been generating more than 1TB of data (around 3 billion messages) in different topics and, apart from seeing some rebalances messages at the beginning when creating the connector, everything runs smoothly. I have a control topic for each topic (which is not necessary as per documentation) and the only major error I have faced is when recreating the main topic with the connector running in the background which directly dies and needs manual recreation. In this specific case the consumer goes a bit crazy and even after recreating it does weird things like having clashes and marking new messages as duplicated. I'll suggest you to fork the repo and remove that requireStable(true) by yourself in case it unblocks you until a maintainer from the project takes a look at the issue... (that worked for me)

As ron mentioned before in any case, the issue seems to come from a dependency from this project/MSK Connect and not your MSK server version :)

Sharu95 commented 3 months ago

Thanks for the reply @antcalvente! I've forked it and will stick to that version as for now, just exploring currently, so nothing urgently blocking really :) Can't do much on the MSK/Connect versions anyway, so keeping a fork is a good workaround until things are fixed

arshahmad1 commented 3 months ago

hey @antcalvente, Thank you soo much for taking time out and providing such detailed response. As you mentioned above I did the following and it stops throwing NoSuchMethodError error:

  1. Increased initial polling duration on CommitterImpl.java#L104 to 30s
  2. Increased initial polling duration on Coordinator.java#L93 to 30s
  3. Removed "requireStable(true)" from CommiterImpl.fetchStableConsumerOffsets()
  4. Then I ran ./gradlew -xtest clean build
  5. And the zip archive gets under ./kafka-connect-runtime/build/distributions

Thanks again for your response and help!

arshahmad1 commented 3 months ago

@antcalvente can you please provide your configurations for reference? I searched the internet and there are no reliable or precise configurations. It will be of great help. Thanks in advance.

antcalvente commented 3 months ago

Here you go @arshahmad1 : https://gist.github.com/antcalvente/27d248019665c260e7c155d4d0860341

Hope it helps :)

Sharu95 commented 3 months ago

@arshahmad1, I'm using the glue catalog in AWS. As per documentation, remember to set "iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog" instead of iceberg.catalog: glue

edit: These are the required options for glue; https://github.com/tabular-io/iceberg-kafka-connect?tab=readme-ov-file#glue-example

arshahmad1 commented 3 months ago

Actually @Sharu95 I'm using below configurations but currently running into Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Invalid table identifier: poc-kafka-flink.mskmytopicma_ib (org.apache.kafka.connect.runtime.WorkerSinkTask:612)

Sharu95 commented 2 months ago

@arshahmad1, whats the full stack trace?

I only use iceberg.tables=<my_database>.<my_table>, not the other ones (iceberg.glue.database and iceberg.glue.table). Also double check the database name and whether or not hyphens might cause the issue.

Also make sure the control topic is created, I can't see that you've specified any here, so I assume you have created it, but giving the full stack trace might be helpful 😄

arshahmad1 commented 2 months ago

I was able to solve this issue, you should NOT use underscores and hyphens in your database and table names 🤦