apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.03k stars 1.82k forks source link

Oracle CDC did not retrieve metadata, other tables without partitions are normal #7860

Open feiyun123-ok opened 4 weeks ago

feiyun123-ok commented 4 weeks ago

Search before asking

What happened

2024-10-16 18:45:56,656 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Fatal Error,

2024-10-16 18:45:56,656 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Please submit bug report in https://github.com/apache/seatunnel/issues

2024-10-16 18:45:56,656 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Reason:SeaTunnel job executed failed

2024-10-16 18:45:56,658 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:211) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34) Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error] - stream load error: [INTERNAL_ERROR]too many filtered rows

    0#  std::_Function_handler<void (doris::RuntimeState*, doris::Status*), doris::StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<doris::StreamLoadContext>)::$_0>::_M_invoke(std::_Any_data const&, doris::RuntimeState*&&, doris::Status*&&) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:357
    1#  doris::FragmentMgr::_exec_actual(std::shared_ptr<doris::FragmentExecState>, std::function<void (doris::RuntimeState*, doris::Status*)> const&) at /var/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:360
    2#  std::_Function_handler<void (), doris::FragmentMgr::exec_plan_fragment(doris::TExecPlanFragmentParams const&, std::function<void (doris::RuntimeState*, doris::Status*)> const&)::$_0>::_M_invoke(std::_Any_data const&) at /var/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:701
    3#  doris::ThreadPool::dispatch_thread() at /home/zcp/repo_center/doris_release/doris/be/src/util/threadpool.cpp:0
    4#  doris::Thread::supervise_thread(void*) at /var/local/ldb_toolchain/bin/../usr/include/pthread.h:562
    5#  start_thread

6# __clone , see more in http://127.0.0.1:8040/api/_load_error_log?file=__shard_27/error_log_insert_stmt_c34c2a972753c81e-d58371e04dfa2fb2_c34c2a972753c81e_d58371e04dfa2fb2 at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:292) at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:74) at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39) at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27) at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70) at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50) at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51) at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78) at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:717) at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1039) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error] - stream load error: [INTERNAL_ERROR]too many filtered rows

    0#  std::_Function_handler<void (doris::RuntimeState*, doris::Status*), doris::StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<doris::StreamLoadContext>)::$_0>::_M_invoke(std::_Any_data const&, doris::RuntimeState*&&, doris::Status*&&) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:357
    1#  doris::FragmentMgr::_exec_actual(std::shared_ptr<doris::FragmentExecState>, std::function<void (doris::RuntimeState*, doris::Status*)> const&) at /var/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:360
    2#  std::_Function_handler<void (), doris::FragmentMgr::exec_plan_fragment(doris::TExecPlanFragmentParams const&, std::function<void (doris::RuntimeState*, doris::Status*)> const&)::$_0>::_M_invoke(std::_Any_data const&) at /var/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:701
    3#  doris::ThreadPool::dispatch_thread() at /home/zcp/repo_center/doris_release/doris/be/src/util/threadpool.cpp:0
    4#  doris::Thread::supervise_thread(void*) at /var/local/ldb_toolchain/bin/../usr/include/pthread.h:562
    5#  start_thread
    6#  __clone

, see more in http://127.0.0.1:8040/api/_load_error_log?file=__shard_27/error_log_insert_stmt_c34c2a972753c81e-d58371e04dfa2fb2_c34c2a972753c81e_d58371e04dfa2fb2 at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:234) at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:208) ... 16 more Caused by: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error] - stream load error: [INTERNAL_ERROR]too many filtered rows

    0#  std::_Function_handler<void (doris::RuntimeState*, doris::Status*), doris::StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<doris::StreamLoadContext>)::$_0>::_M_invoke(std::_Any_data const&, doris::RuntimeState*&&, doris::Status*&&) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:357
    1#  doris::FragmentMgr::_exec_actual(std::shared_ptr<doris::FragmentExecState>, std::function<void (doris::RuntimeState*, doris::Status*)> const&) at /var/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:360
    2#  std::_Function_handler<void (), doris::FragmentMgr::exec_plan_fragment(doris::TExecPlanFragmentParams const&, std::function<void (doris::RuntimeState*, doris::Status*)> const&)::$_0>::_M_invoke(std::_Any_data const&) at /var/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:701
    3#  doris::ThreadPool::dispatch_thread() at /home/zcp/repo_center/doris_release/doris/be/src/util/threadpool.cpp:0
    4#  doris::Thread::supervise_thread(void*) at /var/local/ldb_toolchain/bin/../usr/include/pthread.h:562
    5#  start_thread
    6#  __clone

, see more in http://127.0.0.1:8040/api/_load_error_log?file=__shard_27/error_log_insert_stmt_c34c2a972753c81e-d58371e04dfa2fb2_c34c2a972753c81e_d58371e04dfa2fb2 at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:232) ... 17 more Caused by: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error] - stream load error: [INTERNAL_ERROR]too many filtered rows

    0#  std::_Function_handler<void (doris::RuntimeState*, doris::Status*), doris::StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<doris::StreamLoadContext>)::$_0>::_M_invoke(std::_Any_data const&, doris::RuntimeState*&&, doris::Status*&&) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:357
    1#  doris::FragmentMgr::_exec_actual(std::shared_ptr<doris::FragmentExecState>, std::function<void (doris::RuntimeState*, doris::Status*)> const&) at /var/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:360
    2#  std::_Function_handler<void (), doris::FragmentMgr::exec_plan_fragment(doris::TExecPlanFragmentParams const&, std::function<void (doris::RuntimeState*, doris::Status*)> const&)::$_0>::_M_invoke(std::_Any_data const&) at /var/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:701
    3#  doris::ThreadPool::dispatch_thread() at /home/zcp/repo_center/doris_release/doris/be/src/util/threadpool.cpp:0
    4#  doris::Thread::supervise_thread(void*) at /var/local/ldb_toolchain/bin/../usr/include/pthread.h:562
    5#  start_thread
    6#  __clone

, see more in http://127.0.0.1:8040/api/_load_error_log?file=__shard_27/error_log_insert_stmt_c34c2a972753c81e-d58371e04dfa2fb2_c34c2a972753c81e_d58371e04dfa2fb2 at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.flush(DorisSinkWriter.java:162) at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.prepareCommit(DorisSinkWriter.java:143) at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217) ... 5 more

    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:203)
    ... 2 more

2024-10-16 18:45:56,658 ERROR [o.a.s.c.s.SeaTunnel ] [main] -

SeaTunnel Version

2.3.7

SeaTunnel Config

#### 库存同步
env {
  parallelism = 6
  job.mode = "STREAMING"
  checkpoint.interval = 1000
}

source {
    # This is a example source plugin **only for test and demonstrate the feature source plugin**
    Oracle-CDC {
        result_table_name = "customers"
        skip_analyze = true 
        username = ""
        password = "#"
        database-names = [""]
        schema-names = [""]
        table-names = [""]
        base-url = "jdbc:oracle:thin:@ip:1521:orcl"
        source.reader.close.timeout = 120000
    }
}

transform {
}

sink {
  Doris {
    fenodes = "ip:8030"
    username = 
    password = ""
    database = "demo"
    table = "${table_name}"
    sink.label-prefix = "oracle-cdc"
    sink.enable-2pc = "true"
    sink.enable-delete = "true"
    doris.config {
      format = "json"
      read_json_by_line = "true"
    }
  }
}

Running Command

./bin/seatunnel.sh --config ./config/ods_erp_oracleCDC.conf -m local

Error Exception

org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error] - stream load error: [INTERNAL_ERROR]too many filtered rows

Zeta or Flink or Spark Version

No response

Java or Scala Version

1.8

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

CosmosNi commented 4 weeks ago

This is usually an error returned by Doris.View the returned URL,Maybe the table structure is incorrect

feiyun123-ok commented 4 weeks ago

Reason: column_name[REMARK], the length of input is too long than schema. first 32 bytes of input str: [用户办理新开5G199套餐,] schema length: 500; actual length: 585; . src line [];

feiyun123-ok commented 4 weeks ago

Now there are other issues. There is a problem synchronizing a large table with partitions. The data table in Doris was successfully created, but the partition was not created, and the data cannot be synchronized properly

微信图片_20241017113024

CosmosNi commented 4 weeks ago

doris support dynamic partitions

feiyun123-ok commented 4 weeks ago

I can create a dynamic partition table in Doris, but SEATUNEL reports this error every time it synchronizes this table and cannot continue synchronizing error: Multiple parsing errors io.debezium.text.ParsingException: DDL statement couldn't be parsed. Please open a Jira issue with the statement

CosmosNi commented 4 weeks ago

CDC?Which source is used?

feiyun123-ok commented 4 weeks ago

Oracle-CDC in this issue SeaTunnel Config

feiyun123-ok commented 4 weeks ago

SeaTunnel Version 2.3.7

CosmosNi commented 4 weeks ago

Provides stack information and seatunnel configuration. try schema_save_mode = IGNORE , and Manual create table in doris

feiyun123-ok commented 4 weeks ago

schema_save_mode[Enum] Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side. Option introduction: RECREATE_SCHEMA :Will create when the table does not exist, delete and rebuild when the table is saved CREATE_SCHEMA_WHEN_NOT_EXIST :Will Created when the table does not exist, skipped when the table is saved ERROR_WHEN_SCHEMA_NOT_EXIST :Error will be reported when the table does not exist

default value is CREATE_SCHEMA_WHEN_NOT_EXIST

ORACLE test_table DDL CREATE TABLE T_TEST_PARTITION (
sale_id NUMBER,
product_id NUMBER,
sale_date DATE,
amount NUMBER
)
PARTITION BY RANGE (sale_date) (
PARTITION sales_1 VALUES LESS THAN (TO_DATE('2023-01-01', 'YYYY-MM-DD')), PARTITION sales_2 VALUES LESS THAN (TO_DATE('2024-01-01', 'YYYY-MM-DD')), PARTITION sales_3 VALUES LESS THAN (TO_DATE('2025-01-01', 'YYYY-MM-DD')), PARTITION sales_4 VALUES LESS THAN (TO_DATE('2026-01-01', 'YYYY-MM-DD'))
);

add pk and add data

Oracle CDC did not retrieve metadata, other tables without partitions are normal

CosmosNi commented 3 weeks ago

I tried V2.3.8 and it seems there is no problem. Is it because the primary key is not specified?

feiyun123-ok commented 3 weeks ago

I have created the primary key. I will try using 2.3.8 later and reply later

feiyun123-ok commented 3 weeks ago

V2.3.8, I have tried, the problem is the same

The new question is as follows: `

2024-10-24 14:47:21,163 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Fatal Error,

2024-10-24 14:47:21,163 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Please submit bug report in https://github.com/apache/seatunnel/issues

2024-10-24 14:47:21,163 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Reason:SeaTunnel job executed failed

2024-10-24 14:47:21,165 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:213) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34) Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: One or more fetchers have encountered exception at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:147) at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:167) at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119) at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159) at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132) at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:693) at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1018) at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81) ... 5 more Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50) at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:264) at org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.reader.fetch.logminer.OracleRedoLogFetchTask$RedoLogSplitReadTask.execute(OracleRedoLogFetchTask.java:147) at org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.reader.fetch.logminer.OracleRedoLogFetchTask.execute(OracleRedoLogFetchTask.java:73) at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:106) ... 5 more Caused by: java.sql.SQLException: ORA-18716: {0} 不在任何时区中。DATE at oracle.jdbc.driver.TimestampAccessor.getOffsetDateTime(TimestampAccessor.java:158) at oracle.jdbc.driver.Redirector$33.redirect(Redirector.java:691) at oracle.jdbc.driver.Redirector$33.redirect(Redirector.java:686) at oracle.jdbc.driver.Representation.getObject(Representation.java:564) at oracle.jdbc.driver.Accessor.getObject(Accessor.java:993) at oracle.jdbc.driver.OracleStatement.getObject(OracleStatement.java:6721) at oracle.jdbc.driver.InsensitiveScrollableResultSet.getObject(InsensitiveScrollableResultSet.java:608) at io.debezium.connector.oracle.OracleConnection.lambda$getScnToTimestamp$16(OracleConnection.java:502) at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:642) at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:510) at io.debezium.connector.oracle.OracleConnection.getScnToTimestamp(OracleConnection.java:501) at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.calculateEndScn(LogMinerStreamingChangeEventSource.java:802) at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:200) ... 8 more

    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:205)
    ... 2 more

2024-10-24 14:47:21,165 ERROR [o.a.s.c.s.SeaTunnel ] [main] - `

feiyun123-ok commented 2 weeks ago

SEATUNEL is a good tool, but the cause of the exception is unclear. It's best to improve it

CosmosNi commented 1 week ago

ORA-18716: {0} 不在任何时区中. This seems to be an error reported by Oracle