apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.93k stars 1.79k forks source link

msyql-cdc To Drois #5637

Open tracyliuzw opened 1 year ago

tracyliuzw commented 1 year ago

Search before asking

What happened

During the test, it was found that an abnormal situation occurred at the last moment of synchronizing historical data of mysql-cdc, which should be the crash or loss of historical data during the last synchronization.

SeaTunnel Version

2.3.3

SeaTunnel Config

env {
  # You can set flink configuration here
  execution.parallelism = 10
  job.mode = "STREAMING"
  checkpoint.interval = 2000
}

source {
    MySQL-CDC {
        incremental.parallelism = 10
        server-id = 6002
        username = "doris"
        password = "**************"
        database-names = ["ad_stat_data"]
        table-names = ["ad_stat_data.t_presentee_stat_world_112001"]
        base-url = "jdbc:mysql://10.246.50.113:3310/ad_stat_data"
        startup.mode = "initial"
        snapshot.split.size = 8096
        snapshot.fetch.size = 5000
  }
}

sink {
    Doris {
        fenodes = "10.246.98.111:8030"
        username = root
        password = "Bigdata@igg123456"
        table.identifier = "ad_stat_data.t_presentee_stat_world_112001"
        sink.enable-2pc = "true"
        sink.label-prefix = "json"
        sink.enable-delete = true
        doris.config = {
            format="json"
            read_json_by_line="true"
            merge_type = "MERGE"
            delete = "__DORIS_DELETE_SIGN__=1"
            enable_profile = "true"
        }
    }
}

Running Command

/usr/local/seatunnel/bin/seatunnel.sh -cn seatunnel-igg -n 112001Tokafka -c 112001Tokafka.conf  --async true

Error Exception

2023-10-16 03:22:27,417 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Fatal Error, 

2023-10-16 03:22:27,417 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Please submit bug report in https://github.com/apache/seatunnel/issues

2023-10-16 03:22:27,417 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Reason:SeaTunnel job executed failed 

2023-10-16 03:22:27,418 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:190)
    at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
    at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.NullPointerException
    at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.requestSplit(SourceFlowLifeCycle.java:223)
    at org.apache.seatunnel.engine.server.task.context.SourceReaderContext.sendSplitRequest(SourceReaderContext.java:64)
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.onSplitFinished(IncrementalSourceReader.java:140)
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.finishCurrentFetch(SourceReaderBase.java:204)
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.moveToNextSplit(SourceReaderBase.java:180)
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:161)
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:92)
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:98)
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:112)
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:98)
    at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150)
    at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:105)
    at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:167)
    at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:110)
    at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:611)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
    at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.returnOrThrowWithGetConventions(InvocationFuture.java:121)
    at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.resolveAndThrowIfException(InvocationFuture.java:100)
    at com.hazelcast.spi.impl.AbstractInvocationFuture.get(AbstractInvocationFuture.java:617)
    at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.requestSplit(SourceFlowLifeCycle.java:220)
    ... 19 more
Caused by: java.lang.NullPointerException
    at org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSplitAssigner.lambda$createIncrementalSplit$4(IncrementalSplitAssigner.java:203)
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
    at java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1652)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
    at org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSplitAssigner.createIncrementalSplit(IncrementalSplitAssigner.java:205)
    at org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSplitAssigner.createIncrementalSplits(IncrementalSplitAssigner.java:193)
    at org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSplitAssigner.getNext(IncrementalSplitAssigner.java:102)
    at org.apache.seatunnel.connectors.cdc.base.source.enumerator.HybridSplitAssigner.getNext(HybridSplitAssigner.java:105)
    at org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator.assignSplits(IncrementalSourceEnumerator.java:160)
    at org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator.handleSplitRequest(IncrementalSourceEnumerator.java:81)
    at org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.requestSplit(SourceSplitEnumeratorTask.java:231)
    at org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation.lambda$run$0(RequestSplitOperation.java:62)
    at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
    at org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation.run(RequestSplitOperation.java:52)
    at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:471)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:197)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:137)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
    at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
    at ------ submitted from ------.()
    at com.hazelcast.internal.util.ExceptionUtil.cloneExceptionWithFixedAsyncStackTrace(ExceptionUtil.java:336)
    at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.returnOrThrowWithGetConventions(InvocationFuture.java:112)
    ... 22 more

    at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122)
    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:183)
    ... 2 more

Zeta or Flink or Spark Version

Zeta

Java or Scala Version

1.8

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

tracyliuzw commented 1 year ago

will not be synchronized, what is the reason for this, my doris table structure is as follows: Create Table: CREATE TABLE t_presentee_stat_world_112001 ( ps_id bigint(20) NOT NULL COMMENT '记录编号', ps_date date NULL COMMENT '注册日期', ps_from_id bigint(20) NOT NULL DEFAULT "0" COMMENT '广告渠道,原名推荐者ID, 0其它来源,大于0广告渠道', ps_recommend_method bigint(20) NOT NULL DEFAULT "0" COMMENT '广告系列,原名推荐方式', ps_region varchar(20) NULL COMMENT '地区', ps_game_id bigint(20) NOT NULL DEFAULT "0" COMMENT '游戏ID', ps_online_time_3 bigint(20) NOT NULL DEFAULT "0", ps_online_time_7 bigint(20) NOT NULL DEFAULT "0", ps_online_time_14 bigint(20) NOT NULL DEFAULT "0" ) ENGINE=OLAP UNIQUE KEY(ps_id, ps_date, ps_from_id, ps_recommend_method, ps_region, ps_game_id, ps_site_id, ps_lang_id, ps_platform_id) COMMENT '推广统计-大世界' DISTRIBUTED BY HASH(ps_id, ps_date, ps_from_id, ps_recommend_method, ps_region, ps_game_id) BUCKETS 10 PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "is_being_synced" = "false", "storage_format" = "V2", "light_schema_change" = "true", "disable_auto_compaction" = "false", "enable_single_replica_compaction" = "false" );

tracyliuzw commented 1 year ago

When I modify the table structure, only the historical data will be synchronized, real-time data will not be synchronized, what is the reason for this, my doris table structure is as follows:

github-actions[bot] commented 11 months ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

Vampx commented 3 months ago

Who can answer this question?