apache / doris

Apache Doris is an easy-to-use, high performance and unified analytics database.
https://doris.apache.org
Apache License 2.0
11.85k stars 3.12k forks source link

[Bug] When Flink uses DorisStreamLoad, an exception "Encountered unqualified data" occurs, but the error url is not returned and the data cannot be located. #36506

Open Mou-arch opened 2 weeks ago

Mou-arch commented 2 weeks ago

Search before asking

Version

version: 2.1.4

What's Wrong?

When I use flink cdc to import mongo data into Doris, everything starts fine and runs for a while. I guess there is a problem with the metadata. Then stream load returns an error, but does not return the error log, which means that the specific error data cannot be located.

09:21:43,729 WARN org.apache.flink.runtime.taskmanager.Task [] - ChangelogNormalize[3] -> xxx_stats_cdc[4]: Writer -> xxx_stats_cdc[4]: Committer (1/20)#0 (d2e691a388e0657aeee23073662b7adc_20ba6b65f97481d5570070de90e4e791_0_0) switched from RUNNING to FAILED with failure cause: java.io.IOException: Could not perform checkpoint 7 for operator ChangelogNormalize[3] -> xxx_stats_cdc[4]: Writer -> xxx_stats_cdc[4]: Committer (1/20)#0. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1326) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:122) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-runtime-1.19.0.jar:1.19.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) [flink-runtime-1.19.0.jar:1.19.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) [flink-runtime-1.19.0.jar:1.19.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) [flink-runtime-1.19.0.jar:1.19.0] at java.lang.Thread.run(Thread.java:833) [?:?] Caused by: org.apache.doris.flink.exception.DorisRuntimeException: table xxxdb.xxx_stats_cdc stream load error: [CANCELLED]Encountered unqualified data, stop processing, see more in null at org.apache.doris.flink.sink.writer.DorisWriter.prepareCommit(DorisWriter.java:274) ~[flink-doris-connector-1.19-1.6.1.jar:1.6.1] at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:198) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:168) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:323) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$16(StreamTask.java:1369) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1357) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1314) ~[flink-streaming-java-1.19.0.jar:1.19.0] ... 22 more

What You Expected?

Return the wrong URL to help developers troubleshoot error data. It is recommended to provide query statements for the error log. I tried to use SHOW STREAM LOAD where Label = "xxx" and found that the URL was N/A. The query was be.WARNING.log.20240613-164831 , but the record only has a brief description and no specific error message. It cannot help me troubleshoot the information. It looks like the following W20240619 09:21:43.803699 663722 stream_load_executor.cpp:100] fragment execute failed, err_msg=[CANCELLED]Encountered unqualified data, stop processing, id=3643c7416dac2a8c-0d768e7fd6259ab9, job_id=-1, txn_id=65132, label=shop_stats_cdc_9eWyMeuCC1DnzZoUOvWk079weHsiso5N_xxxx_stats_cdc_5_7_0562b13f-5419-4e08-9c12-4fc494be8802, elapse(s)=31

A large number of data sets cannot troubleshoot positioning data, which means that Doris cannot continue to work. We hope to provide a convenient and stable way to query errors.

How to Reproduce?

Because I couldn't query the error, I actually didn't know which piece of data had the problem, so I just did a simple cdc synchronization, got it from mongob and inserted it into doris

Anything Else?

No response

Are you willing to submit PR?

Code of Conduct

brianumbrellaus commented 2 weeks ago

Temporary solution 1.Try increasing VARCHAR field length to 65533 2.Try adding sink.properties.column separator with '|' and sink.properties.line_delimiter with '//n//r'