apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.79k stars 1.74k forks source link

[Bug][Connector-v2][KafkaSource]Fix the abnormal exit of KafkaConsumerThread. #7273

Open lightzhao opened 1 month ago

lightzhao commented 1 month ago

Purpose of this pull request

When the Kafka cluster or coordinator fails, it will cause a connection exception. After catching the exception, it should not be thrown, which will cause the Kafka consumer thread to exit. When the Kafka cluster is restored, the task can no longer be consumed.

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

lightzhao commented 1 month ago

If we modify the logic like this, is it possible that the Kafka cluster is abnormal, but SeaTunnel cannot exit normally?

Yes, the expectation is that the consumer thread will not exit after the Kafka cluster fails, and continue to execute after recovery, otherwise all tasks will need to be restarted

Hisoka-X commented 1 month ago

If we modify the logic like this, is it possible that the Kafka cluster is abnormal, but SeaTunnel cannot exit normally?

Yes, the expectation is that the consumer thread will not exit after the Kafka cluster fails, and continue to execute after recovery, otherwise all tasks will need to be restarted

In this case, users cannot use the SeaTunnel job status to determine whether the data reading is running normally. This is unacceptable

lightzhao commented 1 month ago

In this case, users cannot use the SeaTunnel job status to determine whether the data reading is running normally. This is unacceptable

OK, SeaTunnel expects that after the kafka cluster fails, it should be restarted to recover, right?

Hisoka-X commented 1 month ago

In this case, users cannot use the SeaTunnel job status to determine whether the data reading is running normally. This is unacceptable

OK, SeaTunnel expects that after the kafka cluster fails, it should be restarted to recover, right?

It should be that after the Kafka cluster fails, it means that the synchronization job cannot continue to run normally, so the job should fail.

lightzhao commented 1 month ago

In this case, users cannot use the SeaTunnel job status to determine whether the data reading is running normally. This is unacceptable

OK, SeaTunnel expects that after the kafka cluster fails, it should be restarted to recover, right?

It should be that after the Kafka cluster fails, it means that the synchronization job cannot continue to run normally, so the job should fail.

OK, but the current mechanism is that the consumer thread just exits, the job does not fail and continues to run. I think this is unreasonable.

corgy-w commented 1 month ago

In this case, users cannot use the SeaTunnel job status to determine whether the data reading is running normally. This is unacceptable

OK, SeaTunnel expects that after the kafka cluster fails, it should be restarted to recover, right?

It should be that after the Kafka cluster fails, it means that the synchronization job cannot continue to run normally, so the job should fail.

OK, but the current mechanism is that the consumer thread just exits, the job does not fail and continues to run. I think this is unreasonable.

+1 I focus this problem, too

Hisoka-X commented 1 month ago

OK, but the current mechanism is that the consumer thread just exits, the job does not fail and continues to run. I think this is unreasonable.

Yes, it's a bug. Could you help to fix it?

lightzhao commented 1 month ago

OK, but the current mechanism is that the consumer thread just exits, the job does not fail and continues to run. I think this is unreasonable.

Yes, it's a bug. Could you help to fix it?

Of course, but what strategies should we adopt after a Kafka cluster failure?

Hisoka-X commented 1 month ago

Of course, but what strategies should we adopt after a Kafka cluster failure?

The job should be failed.

lightzhao commented 1 month ago

Of course, but what strategies should we adopt after a Kafka cluster failure?

The job should be failed.

ok, I will fix.

lightzhao commented 1 month ago

Of course, but what strategies should we adopt after a Kafka cluster failure?

The job should be failed.

When the Kafka cluster fails, KafkaConsumer.poll will not throw any related exceptions, but will only log them. Therefore, the task side cannot get the exception to make it fail. It is recommended not to exit the thread and keep it running.

Hisoka-X commented 1 month ago

When the Kafka cluster fails, KafkaConsumer.poll will not throw any related exceptions, but will only log them. Therefore, the task side cannot get the exception to make it fail. It is recommended not to exit the thread and keep it running.

Ok for me. cc @hailin0