apache / doris-flink-connector

Flink Connector for Apache Doris
https://doris.apache.org/
Apache License 2.0
309 stars 219 forks source link

[Bug] retry infinite 2pc #80

Open rafael81 opened 1 year ago

rafael81 commented 1 year ago

Search before asking

Version

1.15 sink.max-retries = 1

What's Wrong?

retry infinite 2pc


2022-11-18 11:53:48,008 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65212 (019990f47bcb3edc3ef9a00232143186).
2022-11-18 11:53:48,008 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65212 019990f47bcb3edc3ef9a00232143186.
2022-11-18 11:53:49,013 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot e32293a602f95c9f67ad0f8e2c653365.
2022-11-18 11:53:49,013 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2), deploy into slot with allocation id e32293a602f95c9f67ad0f8e2c653365.
2022-11-18 11:53:49,013 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2) switched from CREATED to DEPLOYING.
2022-11-18 11:53:49,014 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Loading JAR files for task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2) [DEPLOYING].
2022-11-18 11:53:49,014 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@3d7d4ae1
2022-11-18 11:53:49,014 INFO  org.apache.flink.runtime.state.StateBackendLoader            [] - State backend loader loads the state backend as HashMapStateBackend
2022-11-18 11:53:49,014 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Checkpoint storage is set to 'jobmanager'
2022-11-18 11:53:49,014 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2) switched from DEPLOYING to INITIALIZING.
2022-11-18 11:53:49,020 WARN  org.apache.flink.metrics.MetricGroup                         [] - Name collision: Group already contains a Metric with the name 'currentEmitEventTimeLag'. Metric will not be reported.[localhost, taskmanager, localhost:33197-e443eb, insert-into_default_catalog.default_database.doris_purchase, Source: purchase[1], 0]
2022-11-18 11:53:49,020 WARN  org.apache.flink.metrics.MetricGroup                         [] - Name collision: Group already contains a Metric with the name 'sourceIdleTime'. Metric will not be reported.[localhost, taskmanager, localhost:33197-e443eb, insert-into_default_catalog.default_database.doris_purchase, Source: purchase[1], 0]
2022-11-18 11:53:49,021 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader.
2022-11-18 11:53:49,021 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2) switched from INITIALIZING to FAILED with failure cause: org.apache.doris.flink.exception.DorisRuntimeException: Commit failed {
    "status": "Fail",
    "msg": "errCode = 2, detailMessage = transaction [49734] not found"
}
    at org.apache.doris.flink.sink.committer.DorisCommitter.commitTransaction(DorisCommitter.java:116)
    at org.apache.doris.flink.sink.committer.DorisCommitter.commit(DorisCommitter.java:71)
    at org.apache.flink.streaming.api.transformations.SinkV1Adapter$CommitterAdapter.commit(SinkV1Adapter.java:282)
    at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:127)
    at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:176)
    at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:160)
    at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:121)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.lang.Thread.run(Thread.java:748)

2022-11-18 11:53:49,021 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2).
2022-11-18 11:53:49,021 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 0eedf3500794baa86ab2fdfcbefeb1c2.
2022-11-18 11:53:50,025 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot e32293a602f95c9f67ad0f8e2c653365.
2022-11-18 11:53:50,026 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e), deploy into slot with allocation id e32293a602f95c9f67ad0f8e2c653365.
2022-11-18 11:53:50,026 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e) switched from CREATED to DEPLOYING.
2022-11-18 11:53:50,026 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Loading JAR files for task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e) [DEPLOYING].
2022-11-18 11:53:50,026 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@3c9058a4
2022-11-18 11:53:50,026 INFO  org.apache.flink.runtime.state.StateBackendLoader            [] - State backend loader loads the state backend as HashMapStateBackend
2022-11-18 11:53:50,026 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Checkpoint storage is set to 'jobmanager'
2022-11-18 11:53:50,026 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e) switched from DEPLOYING to INITIALIZING.
2022-11-18 11:53:50,032 WARN  org.apache.flink.metrics.MetricGroup                         [] - Name collision: Group already contains a Metric with the name 'currentEmitEventTimeLag'. Metric will not be reported.[localhost, taskmanager, localhost:33197-e443eb, insert-into_default_catalog.default_database.doris_purchase, Source: purchase[1], 0]
2022-11-18 11:53:50,032 WARN  org.apache.flink.metrics.MetricGroup                         [] - Name collision: Group already contains a Metric with the name 'sourceIdleTime'. Metric will not be reported.[localhost, taskmanager, localhost:33197-e443eb, insert-into_default_catalog.default_database.doris_purchase, Source: purchase[1], 0]
2022-11-18 11:53:50,033 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader.
2022-11-18 11:53:50,033 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e) switched from INITIALIZING to FAILED with failure cause: org.apache.doris.flink.exception.DorisRuntimeException: Commit failed {
    "status": "Fail",
    "msg": "errCode = 2, detailMessage = transaction [49734] not found"
}
    at org.apache.doris.flink.sink.committer.DorisCommitter.commitTransaction(DorisCommitter.java:116)
    at org.apache.doris.flink.sink.committer.DorisCommitter.commit(DorisCommitter.java:71)
    at org.apache.flink.streaming.api.transformations.SinkV1Adapter$CommitterAdapter.commit(SinkV1Adapter.java:282)
    at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:127)
    at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:176)
    at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:160)
    at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:121)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.lang.Thread.run(Thread.java:748)

What You Expected?

don't retry infinite 2pc which not found transaction in doris be.

How to Reproduce?

You can reproduce If you killed doris backend during sinking data through flink.

Anything Else?

No response

Are you willing to submit PR?

Code of Conduct

doubly-yi commented 1 year ago

I had a similar problem and I wanted to not keep retrying after a failed write. And I set the maximum number of retries does not take effect.

DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setMaxRetries(2);
DorisSink.sink(

                DorisReadOptions.builder().build(),
                DorisExecutionOptions.builder()
                        //.setBatchSize(1)
                        //.setBatchIntervalMs(0L)
                        .setMaxRetries(1)
                        .setStreamLoadProp(properties)
                        .build(),
                DorisOptions.builder()
                        .setFenodes(feNodes)
                        .setTableIdentifier(tableIdentifier)
                        .setUsername(username)
                        .setPassword(password)
                        .build()
        );
link3280 commented 1 year ago

Hi, we ran into the same issue. Anything new?

JNSimba commented 2 months ago

This happens during the Commit phase. The transaction ID recorded in the checkpoint has expired on the FE side. If you commit again, the above error will occur. You cannot start from the checkpoint at this time. You can extend the expiration time by modifying the streaming_label_keep_max_second configuration in fe.conf. The default is 12 hours.

BTW,The latest version also supports the sink.ignore.commit-error property. If you confirm that the txn has succeeded, you can skip the error.