apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.92k stars 1.79k forks source link

[Bug] [connector-kafka] kafka sink error when set semantics = EXACTLY_ONCE #5836

Closed zhaoli2333 closed 9 months ago

zhaoli2333 commented 11 months ago

Search before asking

What happened

I started a streaming task syncing data from tidb to kafka, and set sink config semantics = EXACTLY_ONCE While the datas have bean written into kafka, the task keep restarting with the error: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state.

SeaTunnel Version

2.3.3

SeaTunnel Config

env {
  # You can set SeaTunnel environment configuration here
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
  execution.checkpoint.interval = 5000
  execution.checkpoint.data-uri = "hdfs://localhost:8020/seatunnel/checkpoint"
}

source {
    Jdbc {
        url = "jdbc:mysql://localhost:3306/bigdata"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "password"
        query = "select * from table_name"
        partition_column= "id"
        partition_num = 10
    }
}

sink {
  kafka {
      topic = "topic_name"
      bootstrap.servers = "localhost:9092"
      format = json
      semantics = EXACTLY_ONCE
      kafka.request.timeout.ms = 60000
      kafka.config = {
        "security.protocol": "SASL_PLAINTEXT",
        "sasl.mechanism":"SCRAM-SHA-256",
        "sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxx\" password=\"xxx\";"
        acks = "all"
        request.timeout.ms = 60000
        buffer.memory = 33554432
      }
  }
}

Running Command

./bin/start-seatunnel-flink-15-connector-v2.sh --config example/v2.batch.config.tidb2kafka

Error Exception

org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state.

Zeta or Flink or Spark Version

flink 1.16

Java or Scala Version

1.8

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

github-actions[bot] commented 10 months ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] commented 9 months ago

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.