apache / paimon

Apache Paimon is a lake format that enables building a Realtime Lakehouse Architecture with Flink and Spark for both streaming and batch operations.
https://paimon.apache.org/
Apache License 2.0
2.46k stars 966 forks source link

[e2e] Fix kafka consumer concurrent exception #4261

Closed Shadowell closed 2 months ago

Shadowell commented 2 months ago

Purpose

Linked issue: close #xxx

Fix concurrent exception while building E2E Flink Test / Flink CDC

Error: Exception in thread "Debug Logging Timer" java.lang.RuntimeException: Failed to execute logging action at org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase$2.run(KafkaActionITCaseBase.java:206) at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2490) at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2474) at org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2175) at org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2154) at org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase.logTopicPartitionStatus(KafkaActionITCaseBase.java:242) at org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase.lambda$setup$0(KafkaActionITCaseBase.java:179) at org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase$2.run(KafkaActionITCaseBase.java:204) ... 2 more

Tests

No

API and Format

No

Documentation

No