apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.81k stars 1.76k forks source link

[Bug] [MONGO-CDC] Exception in org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask #7520

Open SonKi-Aurora opened 2 weeks ago

SonKi-Aurora commented 2 weeks ago

Search before asking

What happened

当我试图使用mongo-cdc同步mongo 3.6数据到doris 2.1的过程中,当一段时间mongo数据没有数据变化,之后再产生数据变化会出现下列异常

SeaTunnel Version

2.3.7

SeaTunnel Config

env {
  # You can set engine configuration here
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 1000
}

source {
  MongoDB-CDC {
    hosts = "xx.xx.xx.xx:xx"
    database = ["xx"]
    collection = ["xx.xx"]
    username = xx
    password = xx
    heartbeat.interval.ms = 5000
    schema = {
      fields {
        "_id" : String,
        "source" : String,
        "outSideSource" : String,
        "type" : String,
        "contentType" : String,
        "name" : String,
        "searchName" : String,
        "originName" : String,
        "aliasName" : String,
        "timeLen" : String,
        "zone" : String,
        "language" : String,
        "years" : String,
        "premiereTime" : String,
        "worldPremiere" : String,
        "mainlandPremiere" : String,
        "youkuPremiere" : String,
        "director" : String,
        "scriptWriter" : String,
        "producer" : String,
        "superviser" : String,
        "leadingRole" : String,
        "presenter" : String,
        "associateProducer" : String,
        "otherCrew" : String,
        "openingTheme" : String,
        "endingTheme" : String,
        "episode" : String,
        "roleMapping" : String,
        "tags" : String,
        "poster" : String,
        "updateCycle" : String,
        "score" : String,
        "doubanScore" : String,
        "doubanDetailUrl" : String,
        "playCounts" : String,
        "hot" : String,
        "awards" : String,
        "tvStation" : String,
        "cpCode" : String,
        "cpName" : String,
        "shortDesc" : String,
        "detailsUrl" : String,
        "playUrl" : String,
        "imdbUrl" : String,
        "setCounts" : String,
        "desc" : String,
        "date" : String,
        "creatTime" : String,
        "lastModify" : String,
        "isUpdate" : String,
        "middlePosterAddr" : String,
        "smallPosterAddr" : String,
        "squarePosterAddr" : String,
        "horizontalPoster" : String,
        "isRelatedBase" : Int,
        "relationMode" : String,
        "isCanMakeBase" : Int,
        "checkId" : String,
        "dataProvider" : String,
        "province" : String,
        "provinceEpgIds" : String,
        "isVip" : String,
        "isChecked" : String,
        "jiShu" : String,
        "outSourceId" : String,
        "isCharge" : String,
        "partnerCode" : String,
        "danMuFlag" : Int,
        "bulletCreateDate" : String,
        "commentFlag" : Int,
        "commentCreateDate" : String,
        "elementType" : String,
        "hasHighLight" : String,
        "highLightCreateDate" : String,
        "hasProgram" : String,
        "programCreateDate" : String,
        "hasStartEndPoint" : String,
        "startEndPointCreateDate" : String,
        "yinHeType" : String,
        "statusCode" : String,
        "publisher" : String,
        "updateStatus" : String,
        "isAlone" : String,
        "finishSpiderProgram" : String,
        "hasShotdetRecord" : String
      }
    }
  }
}

sink {
  Doris {
    fenodes = "xx.xx.xx.xx:xx"
    username = xx
    password = "xx"
    table.identifier = "xx.xx"
    sink.label-prefix = "xx_"
    sink.enable-2pc = "true"
    sink.enable-delete = "true"
    doris.config {
      format = "json"
      read_json_by_line = "true"
    }
  }
}

Running Command

./bin/seatunnel.sh --config ./config/xx.config --async -n xx

Error Exception

2024-08-29 11:10:10,595 WARN  [c.m.s.f.MongodbStreamFetchTask] [debezium-reader-0] - Cannot extract clusterTime from change stream event, fallback to current timestamp.
2024-08-29 11:10:11,166 INFO  [o.a.s.c.d.s.w.RecordBuffer    ] [st-multi-table-sink-writer-2] - start buffer data, read queue size 0, write queue size 3
2024-08-29 11:10:11,166 WARN  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - [10.103.22.10]:5802 [seatunnel] [5.1] Exception in org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask@2775d124
java.lang.NumberFormatException: null
        at java.lang.Long.parseLong(Long.java:552) ~[?:1.8.0_381]
        at java.lang.Long.parseLong(Long.java:631) ~[?:1.8.0_381]
        at org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset.getTimestamp(ChangeStreamOffset.java:70) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset.compareTo(ChangeStreamOffset.java:92) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset.compareTo(ChangeStreamOffset.java:37) ~[?:?]
        at org.apache.seatunnel.connectors.cdc.base.source.offset.Offset.isAtOrAfter(Offset.java:71) ~[?:?]
        at org.apache.seatunnel.connectors.cdc.base.source.split.state.IncrementalSplitState.markEnterPureIncrementPhaseIfNeed(IncrementalSplitState.java:82) ~[?:?]
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.markEnterPureIncrementPhase(IncrementalSourceRecordEmitter.java:164) ~[?:?]
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:102) ~[?:?]
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:61) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:110) ~[?:?]
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119) ~[?:?]
        at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:156) ~[seatunnel-starter.jar:2.3.7]
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127) ~[seatunnel-starter.jar:2.3.7]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) ~[seatunnel-starter.jar:2.3.7]
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132) ~[seatunnel-starter.jar:2.3.7]
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:717) ~[seatunnel-starter.jar:2.3.7]
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1039) ~[seatunnel-starter.jar:2.3.7]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_381]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_381]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_381]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_381]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
2024-08-29 11:10:11,166 INFO  [o.a.s.c.d.s.w.DorisStreamLoad ] [st-multi-table-sink-writer-2] - stream load started for sourcePS__vgs_sourcePS_881361616757587971_0_2619
2024-08-29 11:10:11,166 INFO  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - [10.103.22.10]:5802 [seatunnel] [5.1] taskDone, taskId = 40000, taskGroup = TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}
2024-08-29 11:10:11,166 INFO  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - [10.103.22.10]:5802 [seatunnel] [5.1] task 40000 error with exception: [java.lang.NumberFormatException: null], cancel other task in taskGroup TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}.
2024-08-29 11:10:11,166 INFO  [o.a.s.c.d.s.w.DorisStreamLoad ] [stream-load-upload] - start execute load
2024-08-29 11:10:11,166 WARN  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - [10.103.22.10]:5802 [seatunnel] [5.1] Interrupted task 50000 - org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask@32b029ac
2024-08-29 11:10:11,166 INFO  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - [10.103.22.10]:5802 [seatunnel] [5.1] taskDone, taskId = 50000, taskGroup = TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}
2024-08-29 11:10:11,167 INFO  [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - Release classloader for job 881361616757587971 with jars [file:/data/seatunnel/apache-seatunnel/connectors/connector-cdc-mongodb-2.3.7.jar, file:/data/seatunnel/apache-seatunnel/connectors/connector-doris-2.3.7.jar]
2024-08-29 11:10:11,167 INFO  [a.s.c.s.c.s.r.SourceReaderBase] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - Closing Source Reader 0.
2024-08-29 11:10:11,170 INFO  [o.a.s.c.s.c.s.r.f.SplitFetcher] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - Shutting down split fetcher 0
2024-08-29 11:10:11,180 INFO  [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - recycle classloader for thread st-multi-table-sink-writer-2
2024-08-29 11:10:11,180 INFO  [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - recycle classloader for thread hz.main.seaTunnel.task.thread-71
2024-08-29 11:10:11,180 INFO  [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - recycle classloader for thread ForkJoinPool.commonPool-worker-0
2024-08-29 11:10:11,180 INFO  [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - recycle classloader for thread stream-load-upload
2024-08-29 11:10:11,180 INFO  [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - recycle classloader for thread st-multi-table-sink-writer-1
2024-08-29 11:10:11,180 INFO  [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - recycle classloader for thread stream-load-check
2024-08-29 11:10:11,180 INFO  [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - recycle classloader for thread BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}
2024-08-29 11:10:11,185 INFO  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - [10.103.22.10]:5802 [seatunnel] [5.1] taskGroup TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000} complete with FAILED
2024-08-29 11:10:11,185 INFO  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - [10.103.22.10]:5802 [seatunnel] [5.1] task 50000 error with exception: [java.lang.NumberFormatException: null], cancel other task in taskGroup TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}.
2024-08-29 11:10:11,185 INFO  [o.a.s.e.s.TaskExecutionService] [hz.main.seaTunnel.task.thread-71] - [10.103.22.10]:5802 [seatunnel] [5.1] Task TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000} complete with state FAILED
2024-08-29 11:10:11,187 INFO  [o.a.h.i.e.RetryExec           ] [stream-load-upload] - I/O exception (java.net.SocketException) caught when processing request to {}->http://10.103.22.20:8040: Socket is closed
2024-08-29 11:10:11,187 INFO  [o.a.h.i.e.RetryExec           ] [stream-load-upload] - Retrying request to {}->http://10.103.22.20:8040
2024-08-29 11:10:11,189 INFO  [a.s.e.s.s.s.DefaultSlotService] [hz.main.generic-operation.thread-45] - received slot release request, jobID: 881361616757587971, slot: SlotProfile{worker=[10.103.22.10]:5802, slotID=37, ownerJobID=881361616757587971, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='12893983-3196-4808-a2ed-22de231e4ba6'}
2024-08-29 11:10:11,191 INFO  [o.a.s.e.s.TaskExecutionService] [hz.main.generic-operation.thread-5] - [10.103.22.10]:5802 [seatunnel] [5.1] Task (TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}) need cancel.
2024-08-29 11:10:11,192 WARN  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}] - [10.103.22.10]:5802 [seatunnel] [5.1] Interrupted task 20000 - org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask@228a8a17
2024-08-29 11:10:11,192 INFO  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}] - [10.103.22.10]:5802 [seatunnel] [5.1] taskDone, taskId = 20000, taskGroup = TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}
2024-08-29 11:10:11,192 INFO  [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}] - Release classloader for job 881361616757587971 with jars [file:/data/seatunnel/apache-seatunnel/connectors/connector-cdc-mongodb-2.3.7.jar]
2024-08-29 11:10:11,197 INFO  [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}] - recycle classloader for thread MaintenanceTimer-2-thread-1
2024-08-29 11:10:11,197 INFO  [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}] - recycle classloader for thread BufferPoolPruner-1-thread-1
2024-08-29 11:10:11,197 INFO  [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}] - recycle classloader for thread cluster-ClusterId{value='66cfd9f81461db4b6c7a5a33', description='null'}-10.103.12.39:27017
2024-08-29 11:10:11,197 INFO  [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}] - recycle classloader for thread BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}
2024-08-29 11:10:11,197 INFO  [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}] - recycle classloader for thread cluster-rtt-ClusterId{value='66cfd9f81461db4b6c7a5a33', description='null'}-10.103.12.39:27017
2024-08-29 11:10:11,201 INFO  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}] - [10.103.22.10]:5802 [seatunnel] [5.1] taskGroup TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1} complete with CANCELED
2024-08-29 11:10:11,201 INFO  [.e.IncrementalSourceEnumerator] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}] - Closing enumerator...
2024-08-29 11:10:11,201 INFO  [o.a.s.e.s.TaskExecutionService] [hz.main.seaTunnel.task.thread-71] - [10.103.22.10]:5802 [seatunnel] [5.1] Task TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1} complete with state CANCELED
2024-08-29 11:10:13,141 INFO  [r.IncrementalSourceSplitReader] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - Close current fetcher org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher
2024-08-29 11:10:13,596 INFO  [c.m.k.c.s.h.HeartbeatManager  ] [debezium-reader-0] - Generating heartbeat event. {"_data": {"$binary": {"base64": "gmbP5pIAAAABRjxfaWQAPDRkMDA4ODRjMmYyYTI0MWJkNzAwY2ViODZUVlNPVQAAWhAEI6PeJyh4Q4WSkVMWgbWnSgQ=", "subType": "00"}}}
2024-08-29 11:10:13,597 INFO  [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - Split fetcher 0 exited.
2024-08-29 11:10:14,207 INFO  [a.s.e.s.s.s.DefaultSlotService] [hz.main.generic-operation.thread-19] - received slot release request, jobID: 881361616757587971, slot: SlotProfile{worker=[10.103.22.10]:5802, slotID=36, ownerJobID=881361616757587971, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='12893983-3196-4808-a2ed-22de231e4ba6'}

Zeta or Flink or Spark Version

Zeta 2.3.7

Java or Scala Version

Java 8

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

jw-itq commented 4 days ago

I have also encountered this issue