apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.01k stars 1.82k forks source link

[Bug] [KafkaSink] Failed to write data due to opening transaction #4247

Open lightzhao opened 1 year ago

lightzhao commented 1 year ago

Search before asking

What happened

When "semantics=EXACTLY_ONCE" is configured in kafkaSink, the data cannot be written successfully, and the checkpoint fails. The exception is as follows. 1.[org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state.] 2.[Transiting to fatal error state due to org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one.]

SeaTunnel Version

2.3.0

SeaTunnel Config

kafka {
      topic = "test01"
      bootstrap.servers = "localhost:9092"
      semantics = EXACTLY_ONCE
  }

Running Command

bin/start-seatunnel-flink-connector-v2.sh -m yarn-cluster --config config/kafka_test

Error Exception

1.[org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state.]
2.[Transiting to fatal error state due to org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one.]

Flink or Spark Version

flink 1.13.1

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

linyu003 commented 1 year ago

I am watching on this issue now. I have reproduced the problem by the config below.

env {
    execution.parallelism = 1
    job.mode = "STREAMING"
    execution.checkpoint.interval = 10000
  }

  source {
    FakeSource {
      parallelism = 1
      result_table_name = "fake"
      split.read-interval = 1000
      row.num = 2
      schema = {
        fields {
          name = "string"
          age = "int"
        }
      }
    }
  }

  transform {
    sql {
      sql = "select name,age from fake"
    }
  }
  sink{
  kafka {
        topic = "test01"
        bootstrap.servers = "localhost:9092"
        semantics = EXACTLY_ONCE
    }
  }

It seems that the KafkaSinkCommitter doesnot work well if the transaction is empty (meaning that the trasaction has no record to commit). A empty transaction is a special case, so I want to just commit the transaction in KafkaSinkWriter.snapshotState, instead of working hard to make KafkaSinkCommitter compatitive with it.

I could submit a pr for this in several days.

linyu003 commented 1 year ago

I find more problems while trying to solve this issue.

Reproduce the Problems

seatunnel branch: dev revision number: b8c6bbd1e6dea6462d419d6c7adf20f04d7a2430 add code to seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml

        <dependency>
            <groupId>org.apache.seatunnel</groupId>
            <artifactId>connector-kafka</artifactId>
            <version>${project.version}</version>
        </dependency>

create file seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_kafka.conf

env {
    execution.parallelism = 1
    job.mode = "STREAMING"
    execution.checkpoint.interval = 10000
  }

source {
    FakeSource {
      parallelism = 1
      result_table_name = "fake"
      split.read-interval = 1000
#       split.num = 60
      row.num = 2
      schema = {
        fields {
          name = "string"
          age = "int"
        }
      }
    }
  }

transform {}

sink{
  kafka {
        topic = "test01"
        bootstrap.servers = "localhost:9092"
        semantics = EXACTLY_ONCE
    }
}

build module seatunnel-examples in idea and run seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelApiExample.java.main() with first arg: /examples/fake_to_kafka.conf

wait less than one minute, you can see the problem.

A sample running log. issue4247.log

Problems

talking about KafkaTransactionSender, not KafkaNoTransactionSender.

Problem 1

line 913 in log: 2023-03-31 20:02:25,834 INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-SeaTunnel5627-1, transactionalId=SeaTunnel5627-1] Transiting to fatal error state due to org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state.

reason

where a checkpoint is triggerred, KafkaSinkWriter#snapshotState is called, in which producer.close() is called before a new producer is created. ( note: i found that producer.close() is new added code, in KafkaTransactionSender#getTransactionProducer)

Producer.close() method implementation will try close gracefully, which aborts the current transaction. When commiter try to commit the transaction later, an error occurred beacause it is already aborted.

solution

call kafkaProducer.close(Duration.ZERO) instead of kafkaProducer.close() the former one will close immediately without close current transaction.

Problem 2

At the end of the log file, you can see

2023-03-31 20:02:57,291 DEBUG org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender - Abort kafka transaction: SeaTunnel5627-2
2023-03-31 20:02:57,295 DEBUG org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender - Abort kafka transaction: SeaTunnel5627-3
2023-03-31 20:02:57,296 DEBUG org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender - Abort kafka transaction: SeaTunnel5627-4
2023-03-31 20:02:57,298 DEBUG org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender - Abort kafka transaction: SeaTunnel5627-5
2023-03-31 20:02:57,299 DEBUG org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender - Abort kafka transaction: SeaTunnel5627-6
2023-03-31 20:02:57,301 DEBUG org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender - Abort kafka transaction: SeaTunnel5627-7

it is a dead lock.

reason

In KafkaTransactionSender#abortTransaction(long), it try to abort all transactions which id >= checkpointId. It will not stop until epoch is 0. However,the epoch in transaction-manager will not be updated even the transactionalId is set to another value, so a dead lock will occur when the epoch is not 0 initially.

solution

Since beginning a transaction will fence all transaction before, aborting all transactions is not necessary. And the KafkaTransactionSender#abortTransaction(long) just call producer.flush() ,without calling producer.abortTransaction(). I remove the for-loop and call producer.abortTransaction() instead of producer.flush()

Problem 3

After fixing problem 1 and problem 2, rerun the example-application, you will get the problem mentioned in this issue initially by @lightzhao.

As I said before:

It seems that the KafkaSinkCommitter doesnot work well if the transaction is empty (meaning that the trasaction has no record to commit). A empty transaction is a special case, so I want to just commit the transaction in KafkaSinkWriter.snapshotState, instead of working hard to make KafkaSinkCommitter compatitive with it.

I will make my pr later

agp5050 commented 2 weeks ago
 <groupId>org.apache.seatunnel</groupId>
<artifactId>connector-kafka</artifactId>
<name>SeaTunnel : Connectors V2 : Kafka</name>
 <version>2.3.9-SNAPSHOT</version>
 This problem is still not fixed .
 The Loop runs endlessly.
agp5050 commented 2 weeks ago

bort

agp5050 commented 2 weeks ago
    String transactionId = generateTransactionId(this.transactionPrefix, i);
    producer.setTransactionalId(transactionId);
    if (log.isDebugEnabled()) {
        log.debug("Abort kafka transaction: {}", transactionId);
    }
    producer.flush();

    #修改下KafkaTransactionSender就行。
agp5050 commented 2 weeks ago

![Uploading endless.png…]()