apache / doris

Apache Doris is an easy-to-use, high performance and unified analytics database.
https://doris.apache.org
Apache License 2.0
12.78k stars 3.29k forks source link

[Bug] flink-sql-connector-mysql-cdc 从mysql同步到doris异常。 #30171

Closed chenhongbinjs closed 10 months ago

chenhongbinjs commented 10 months ago

Search before asking

Version

doris: apache-doris-2.0.3-bin-x64-noavx2 flink: flink-1.18.0 mysql: 8.0.29

libs: flink-sql-connector-mysql-cdc-3.0.0.jar flink-doris-connector-1.18-1.5.1.jar mysql-connector-j-8.0.32.jar

What's Wrong?

当我采用flink-cdc同步mysql到doris出现异常,已经有部分数据同步成功,具体如下: 查到原因了,mysql中的数据类型是mediumtext,但是doris里自动建的表是text类型,应该怎么操作呢

同步脚本: bin/flink run \ -Dexecution.checkpointing.interval=10s \ -Dparallelism.default=1 \ -c org.apache.doris.flink.tools.cdc.CdcTools \ lib/flink-doris-connector-1.18-1.5.1.jar \ mysql-sync-database \ --database ods_db_df63580101 \ --mysql-conf hostname=192.168.19.151 \ --mysql-conf port=3306 \ --mysql-conf username=root \ --mysql-conf password=*** \ --mysql-conf database-name=dbdf63580101 \ --table-prefix "ods" \ --including-tables "t_lawsuits_details" \ --sink-conf fenodes=127.0.0.1:8030 \ --sink-conf username=root \ --sink-conf password= \ --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ --sink-conf sink.label-prefix=label_20240119_ods_db_df63580101 \ --table-conf replication_num=1

异常描述: 2024-01-20 19:53:01 java.lang.Exception: Could not perform checkpoint 10761 for operator Source: MySQL Source -> Process -> ods_t_lawsuits_details: Writer -> ods_t_lawsuits_details: Committer (1/1)#9236. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1203) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$14(StreamTask.java:1150) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.doris.flink.exception.DorisRuntimeException: tabel {} stream load error: ods_db_df63580101.ods_t_lawsuits_details, see more in [INTERNAL_ERROR]too many filtered rows

0#  std::_Function_handler<void (doris::RuntimeState*, doris::Status*), doris::StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<doris::StreamLoadContext>)::$_0>::_M_invoke(std::_Any_data const&, doris::RuntimeState*&&, doris::Status*&&) at /root/src/doris-2.0/be/src/common/status.h:354
1#  doris::FragmentMgr::_exec_actual(std::shared_ptr<doris::FragmentExecState>, std::function<void (doris::RuntimeState*, doris::Status*)> const&) at /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:360
2#  std::_Function_handler<void (), doris::FragmentMgr::exec_plan_fragment(doris::TExecPlanFragmentParams const&, std::function<void (doris::RuntimeState*, doris::Status*)> const&)::$_0>::_M_invoke(std::_Any_data const&) at /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:701
3#  doris::ThreadPool::dispatch_thread() at /root/src/doris-2.0/be/src/util/threadpool.cpp:0
4#  doris::Thread::supervise_thread(void*) at /var/local/ldb-toolchain/bin/../usr/include/pthread.h:562
5#  start_thread
6#  __clone

at org.apache.doris.flink.sink.writer.DorisWriter.prepareCommit(DorisWriter.java:232)
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:199)
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:169)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:322)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$15(StreamTask.java:1318)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1306)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1191)
... 14 more

What You Expected?

希望知道原因,还有如何解决。

How to Reproduce?

No response

Anything Else?

No response

Are you willing to submit PR?

Code of Conduct

chenhongbinjs commented 10 months ago

发现是string长度有限制,修改string_type_length_soft_limit,解决。