apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.06k stars 1.83k forks source link

[Bug] [Kafka] Use EXACTLY_ONCE commit error for kafka sink #7755

Open hailin0 opened 1 month ago

hailin0 commented 1 month ago

Search before asking

What happened

https://mp.weixin.qq.com/s/DSv76j1riGMNBkR0VWNEhg

kafka clientId duplicate for writer and committer

image

SeaTunnel Version

dev

SeaTunnel Config

-

Running Command

-

Error Exception

-

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

corgy-w commented 1 month ago

Supplement: Recently,I have also tested zeta, and the corresponding error is as follows. I think this is the zeta error described in the article.(guess) image

liunaijie commented 1 month ago

related fix https://github.com/apache/seatunnel/pull/4469#issuecomment-2372833258, but it not update long time. @fcb-xiaobo hi, can you help fix this issue?

Carl-Zhou-CN commented 1 month ago

I also looked into it and provided some information, it should be that write and commit both hold the same client.id producer, conflict

fcb-xiaobo commented 1 month ago

I am currently trying to reproduce this issue,and I will try to fix it later

corgy-w commented 1 month ago

related fix #4469 (comment), but it not update long time. @fcb-xiaobo hi, can you help fix this issue?

@fcb-xiaobo As long as you configure 'semantics = EXACTLY_ONCE' to create this problem, pr referenced by @liunaijie is a good idea but needs some optimization,I briefly tried it

corgy-w commented 1 month ago

I am currently trying to reproduce this issue,and I will try to fix it later

@fcb-xiaobo Hi, is there any progress here?

fcb-xiaobo commented 1 month ago

I am currently trying to reproduce this issue,and I will try to fix it later

@fcb-xiaobo Hi, is there any progress here?

The local code has been modified, but there are still issues with the e2e test case running on Docker, which I am still working on resolving

gitfortian commented 1 month ago

at_least_once mode,this problem also occurs

gitfortian commented 1 month ago

at_least_once mode,this problem also occurs

agp5050 commented 3 weeks ago

[903461970382946316] 2024-10-29 10:01:43,713 ERROR [.s.e.s.c.CheckpointCoordinator] [seatunnel-coordinator-service-268] - notify checkpoint completed failed java.util.concurrent.CompletionException: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state.

at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_202]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_202]
at java.util.concurrent.CompletableFuture.biRelay(CompletableFuture.java:1286) ~[?:1.8.0_202]
at java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1270) ~[?:1.8.0_202]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_202]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[?:1.8.0_202]
at com.hazelcast.spi.impl.AbstractInvocationFuture.onComplete(AbstractInvocationFuture.java:1243) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.AbstractInvocationFuture.complete0(AbstractInvocationFuture.java:1234) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionallyInternal(AbstractInvocationFuture.java:1223) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationservice.impl.Invocation.completeExceptionally(Invocation.java:680) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyThrowable(Invocation.java:386) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyError(Invocation.java:330) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationservice.impl.InboundResponseHandler.notifyErrorResponse(InboundResponseHandler.java:145) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationservice.impl.InboundResponseHandler.accept(InboundResponseHandler.java:101) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationservice.impl.InboundResponseHandlerSupplier$ResponseThread.doRun(InboundResponseHandlerSupplier.java:297) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationservice.impl.InboundResponseHandlerSupplier$ResponseThread.executeRun(InboundResponseHandlerSupplier.java:284) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) ~[seatunnel-starter.jar:2.3.8]

Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state.

at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$runInternal$0(CheckpointFinishedOperation.java:99) ~[seatunnel-starter.jar:2.3.8]
at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48) ~[seatunnel-starter.jar:2.3.8]
at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.runInternal(CheckpointFinishedOperation.java:81) ~[seatunnel-starter.jar:2.3.8]
at org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:44) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:471) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:197) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:137) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) ~[seatunnel-starter.jar:2.3.8]
... 1 more
aiyi926 commented 2 days ago

Version 2.3.8, this problem still exists. When I tested doris or clickhouse sink to kafka, I reported this error: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

2.3.8版本,这个问题还存在,我测试了从doris或者clickhouse sink到kafka时,就会报这个错:org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

corgy-w commented 2 days ago

Version 2.3.8, this problem still exists. When I tested doris or clickhouse sink to kafka, I reported this error: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

2.3.8版本,这个问题还存在,我测试了从doris或者clickhouse sink到kafka时,就会报这个错:org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

wait -> https://github.com/apache/seatunnel/pull/7857 @aiyi926 you can pay attention and provide corresponding comments.