DTStack / chunjun

A data integration framework
https://dtstack.github.io/chunjun/
Apache License 2.0
4.01k stars 1.69k forks source link

Logminer to oracle 运行报错,求正确的JSON配置文件格式 #939

Open queyijun opened 2 years ago

queyijun commented 2 years ago

JSON脚本:

{
  "job": {
    "content": [
      {
        "reader": {
          "parameter": {
            "jdbcUrl": "jdbc:oracle:thin:@192.168.147.128:1521:helowin",
            "username": "qyj",
            "password": "qyj",
            "supportAutoAddLog": false,
            "table": [
              "QYJ.test_01"
            ],
            "cat": "UPDATE,INSERT,DELETE",
            "pavingData": true,
            "readPosition": "CURRENT",
            "queryTimeout": 3000
          },
          "name": "oraclelogminerreader"
        },
        "writer": {
          "name": "oraclewriter",
          "parameter": {
            "allReplace": true,
            "username": "qyj",9
            "password": "qyj",
            "connection": [
              {
                "schema": "QYJ",
                "jdbcUrl": "jdbc:oracle:thin:@192.168.147.128:1521:helowin",
                "table": [
                  "test_02"
                ]
              }
            ],
            "column": [
              {
                "name": "id",
                "type": "string"
              },
              {
                "name": "name",
                "type": "string"
              }
            ],
            "writeMode": "update",
            "updateKey": {
              "key": [
                "id"
              ]
            }
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": 0
      },
      "errorLimit": {
        "record": 1
      },
      "restore": {
        "maxRowNumForCheckpoint": 0,
        "isRestore": false,
        "restoreColumnName": "",
        "restoreColumnIndex": 0
      },
      "log": {
        "isLogger": true,
        "level": "debug",
        "path": "",
        "pattern": ""
      }
    }
  }
}

数据库案例: 0000001 建表语句:

-- ----------------------------
-- Table structure for test_01
-- ----------------------------
DROP TABLE "QYJ"."test_01";
CREATE TABLE "QYJ"."test_01" (
  "id" VARCHAR2(20 BYTE) NOT NULL,
  "name" VARCHAR2(255 BYTE)
)
TABLESPACE "USERS"
LOGGING
NOCOMPRESS
PCTFREE 10
INITRANS 1
STORAGE (
  INITIAL 65536 
  NEXT 1048576 
  MINEXTENTS 1
  MAXEXTENTS 2147483645
  BUFFER_POOL DEFAULT
)
PARALLEL 1
NOCACHE
DISABLE ROW MOVEMENT
;

-- ----------------------------
-- Primary Key structure for table test_01
-- ----------------------------
ALTER TABLE "QYJ"."test_01" ADD CONSTRAINT "SYS_C0011089" PRIMARY KEY ("id");

启动项目后,执行sql语句: INSERT INTO QYJ."test_01" t (t."id",t."name") VALUES ('55','aa'); chunjun等待读取文件之后, 报错内容如下: com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IndexOutOfBoundsException: Index: -1, Size: 2 at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2207) at com.google.common.cache.LocalCache.get(LocalCache.java:3953) at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4790) at com.dtstack.chunjun.connector.jdbc.sink.PreparedStmtProxy.getOrCreateFieldNamedPstmt(PreparedStmtProxy.java:151) at com.dtstack.chunjun.connector.jdbc.sink.PreparedStmtProxy.convertToExternal(PreparedStmtProxy.java:122) at com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat.writeMultipleRecordsInternal(JdbcOutputFormat.java:215) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecordInternal(BaseRichOutputFormat.java:483) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$initTimingSubmitTask$0(BaseRichOutputFormat.java:443) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.IndexOutOfBoundsException: Index: -1, Size: 2 at java.util.LinkedList.checkElementIndex(LinkedList.java:555) at java.util.LinkedList.get(LinkedList.java:476) at com.dtstack.chunjun.connector.jdbc.sink.DynamicPreparedStmt.getColumnMeta(DynamicPreparedStmt.java:240) at com.dtstack.chunjun.connector.jdbc.sink.DynamicPreparedStmt.buildStmt(DynamicPreparedStmt.java:84) at com.dtstack.chunjun.connector.jdbc.sink.PreparedStmtProxy.lambda$getOrCreateFieldNamedPstmt$0(PreparedStmtProxy.java:155) at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4793) at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3542) at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2323) at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2286) at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201) ... 15 more 2022-06-10 16:18:00,050 - 76776 WARN [dirty-consumer-pool-2-thread-2] com.dtstack.chunjun.dirty.log.LogDirtyDataCollector: ====================Dirty Data===================== DirtyDataEntry[jobId='9c17ecfce066c8c2cb97f71456318c6e', jobName='Flink_Job', operatorName='Sink: oraclesinkfactory', dirtyContent='{"extHeader":["schema","opTime","table","scn","ts"],"byteSize":1285,"string":"(1159599,QYJ,test_01,6940940078327926784,2022-06-10 16:17:32.000000,INSERT,null,null,56,aa)","headers":["scn","schema","table","ts","opTime","type","before_id","before_name","after_id","after_name"],"arity":10,"rowKind":"INSERT","headerInfo":{"scn":0,"schema":1,"table":2,"ts":3,"opTime":4,"type":5,"before_id":6,"before_name":7,"after_id":8,"after_name":9}}', errorMessage='com.dtstack.chunjun.throwable.WriteRecordException: JdbcOutputFormat [Flink_Job] writeRecord error: when converting field[0] in Row(+I(1159599,QYJ,test_01,6940940078327926784,2022-06-10 16:17:32.000000,INSERT,null,null,56,aa)) com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IndexOutOfBoundsException: Index: -1, Size: 2 at com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat.processWriteException(JdbcOutputFormat.java:358) at com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat.writeSingleRecordInternal(JdbcOutputFormat.java:196) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeSingleRecord(BaseRichOutputFormat.java:466) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$writeRecordInternal$1(BaseRichOutputFormat.java:487) at java.util.ArrayList.forEach(ArrayList.java:1259) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecordInternal(BaseRichOutputFormat.java:487) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$initTimingSubmitTask$0(BaseRichOutputFormat.java:443) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IndexOutOfBoundsException: Index: -1, Size: 2 at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2207) at com.google.common.cache.LocalCache.get(LocalCache.java:3953) at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4790) at com.dtstack.chunjun.connector.jdbc.sink.PreparedStmtProxy.getOrCreateFieldNamedPstmt(PreparedStmtProxy.java:151) at com.dtstack.chunjun.connector.jdbc.sink.PreparedStmtProxy.writeSingleRecordInternal(PreparedStmtProxy.java:199) at com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat.writeSingleRecordInternal(JdbcOutputFormat.java:193) ... 13 more Caused by: java.lang.IndexOutOfBoundsException: Index: -1, Size: 2 at java.util.LinkedList.checkElementIndex(LinkedList.java:555) at java.util.LinkedList.get(LinkedList.java:476) at com.dtstack.chunjun.connector.jdbc.sink.DynamicPreparedStmt.getColumnMeta(DynamicPreparedStmt.java:240) at com.dtstack.chunjun.connector.jdbc.sink.DynamicPreparedStmt.buildStmt(DynamicPreparedStmt.java:84) at com.dtstack.chunjun.connector.jdbc.sink.PreparedStmtProxy.lambda$getOrCreateFieldNamedPstmt$0(PreparedStmtProxy.java:155) at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4793) at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3542) at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2323) at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2286) at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201) ... 18 more ', fieldName='null', createTime=2022-06-10 16:17:59.999]

=================================================== 2022-06-10 16:18:00,050 - 76776 ERROR [timer-data-write-thread-pool-1-thread-1] com.dtstack.chunjun.sink.format.BaseRichOutputFormat:Writing records failed. com.dtstack.chunjun.throwable.NoRestartException: The dirty consumer shutdown, due to the consumed count exceed the max-consumed [0] at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.addConsumed(DirtyDataCollector.java:105) at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.offer(DirtyDataCollector.java:79) at com.dtstack.chunjun.dirty.manager.DirtyManager.collect(DirtyManager.java:140) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeSingleRecord(BaseRichOutputFormat.java:469) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$writeRecordInternal$1(BaseRichOutputFormat.java:487) at java.util.ArrayList.forEach(ArrayList.java:1259) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecordInternal(BaseRichOutputFormat.java:487) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$initTimingSubmitTask$0(BaseRichOutputFormat.java:443) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)


之后再执行一次 INSERT INTO QYJ."test_01" t (t."id",t."name") VALUES ('57','aa'); 报错内容如下: 2022-06-10 16:19:51,023 - 187749 INFO [LogMinerConnection-pool-0] com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerConnection:query LogMiner data, startScn:1159671,endScn:1159691,timeConsuming 1 2022-06-10 16:19:51,026 - 187752 INFO [LogMiner-pool-0] com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerConnection:getEndScn success,startScn:1159671,endScn:1159696, loadRedoLog:true 2022-06-10 16:19:51,112 - 187838 INFO [Sink: oraclesinkfactory (1/1)#0] com.dtstack.chunjun.dirty.log.LogDirtyDataCollector:Print consumer closed. 2022-06-10 16:19:51,115 - 187841 WARN [Sink: oraclesinkfactory (1/1)#0] org.apache.flink.runtime.taskmanager.Task:Sink: oraclesinkfactory (1/1)#0 (212b9c5e13788e5f75e1ac3ca8d96c89) switched from RUNNING to FAILED. com.dtstack.chunjun.throwable.NoRestartException: The dirty consumer shutdown, due to the consumed count exceed the max-consumed [0] at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.addConsumed(DirtyDataCollector.java:105) at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.offer(DirtyDataCollector.java:79) at com.dtstack.chunjun.dirty.manager.DirtyManager.collect(DirtyManager.java:140) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeSingleRecord(BaseRichOutputFormat.java:469) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$writeRecordInternal$1(BaseRichOutputFormat.java:487) at java.util.ArrayList.forEach(ArrayList.java:1259) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecordInternal(BaseRichOutputFormat.java:487) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$initTimingSubmitTask$0(BaseRichOutputFormat.java:443) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Suppressed: java.lang.RuntimeException: com.dtstack.chunjun.throwable.NoRestartException: The dirty consumer shutdown, due to the consumed count exceed the max-consumed [0] at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.close(BaseRichOutputFormat.java:353) at com.dtstack.chunjun.sink.DtOutputFormatSinkFunction.close(DtOutputFormatSinkFunction.java:127) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:797) at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:776) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:691) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:595) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) ... 1 more [CIRCULAR REFERENCE:com.dtstack.chunjun.throwable.NoRestartException: The dirty consumer shutdown, due to the consumed count exceed the max-consumed [0]] 2022-06-10 16:19:51,115 - 187841 INFO [Sink: oraclesinkfactory (1/1)#0] org.apache.flink.runtime.taskmanager.Task:Freeing task resources for Sink: oraclesinkfactory (1/1)#0 (212b9c5e13788e5f75e1ac3ca8d96c89). 2022-06-10 16:19:51,120 - 187846 INFO [flink-akka.actor.default-dispatcher-39] org.apache.flink.runtime.taskexecutor.TaskExecutor:Un-registering task and sending final execution state FAILED to JobManager for task Sink: oraclesinkfactory (1/1)#0 212b9c5e13788e5f75e1ac3ca8d96c89. 2022-06-10 16:19:51,121 - 187847 INFO [flink-akka.actor.default-dispatcher-37] org.apache.flink.runtime.executiongraph.Execution:Sink: oraclesinkfactory (1/1) (212b9c5e13788e5f75e1ac3ca8d96c89) switched from RUNNING to FAILED on 96d2605c-bca9-4017-90df-fa588ced9841 @ activate.navicat.com (dataPort=-1). com.dtstack.chunjun.throwable.NoRestartException: The dirty consumer shutdown, due to the consumed count exceed the max-consumed [0] at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.addConsumed(DirtyDataCollector.java:105) at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.offer(DirtyDataCollector.java:79) at com.dtstack.chunjun.dirty.manager.DirtyManager.collect(DirtyManager.java:140) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeSingleRecord(BaseRichOutputFormat.java:469) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$writeRecordInternal$1(BaseRichOutputFormat.java:487) at java.util.ArrayList.forEach(ArrayList.java:1259) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecordInternal(BaseRichOutputFormat.java:487) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$initTimingSubmitTask$0(BaseRichOutputFormat.java:443) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Suppressed: java.lang.RuntimeException: com.dtstack.chunjun.throwable.NoRestartException: The dirty consumer shutdown, due to the consumed count exceed the max-consumed [0] at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.close(BaseRichOutputFormat.java:353) at com.dtstack.chunjun.sink.DtOutputFormatSinkFunction.close(DtOutputFormatSinkFunction.java:127) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:797) at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:776) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:691) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:595) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) ... 1 more [CIRCULAR REFERENCE:com.dtstack.chunjun.throwable.NoRestartException: The dirty consumer shutdown, due to the consumed count exceed the max-consumed [0]] 2022-06-10 16:19:51,125 - 187851 INFO [flink-akka.actor.default-dispatcher-37] org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy:Calculating tasks to restart to recover the failed task 0a448493b4782967b150582570326227_0. 2022-06-10 16:19:51,125 - 187851 INFO [flink-akka.actor.default-dispatcher-37] org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy:2 tasks should be restarted to recover the failed task 0a448493b4782967b150582570326227_0. 2022-06-10 16:19:51,127 - 187853 INFO [flink-akka.actor.default-dispatcher-37] org.apache.flink.runtime.executiongraph.ExecutionGraph:Job Flink_Job (9c17ecfce066c8c2cb97f71456318c6e) switched from state RUNNING to FAILING. org.apache.flink.runtime.JobException: The failure is not recoverable at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:107) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:517) at akka.actor.Actor.aroundReceive$(Actor.scala:515) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage$$$capture(ActorCell.scala:592) at akka.actor.ActorCell.receiveMessage(ActorCell.scala) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: com.dtstack.chunjun.throwable.NoRestartException: The dirty consumer shutdown, due to the consumed count exceed the max-consumed [0] at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.addConsumed(DirtyDataCollector.java:105) at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.offer(DirtyDataCollector.java:79) at com.dtstack.chunjun.dirty.manager.DirtyManager.collect(DirtyManager.java:140) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeSingleRecord(BaseRichOutputFormat.java:469) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$writeRecordInternal$1(BaseRichOutputFormat.java:487) at java.util.ArrayList.forEach(ArrayList.java:1259) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecordInternal(BaseRichOutputFormat.java:487) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$initTimingSubmitTask$0(BaseRichOutputFormat.java:443) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Suppressed: java.lang.RuntimeException: com.dtstack.chunjun.throwable.NoRestartException: The dirty consumer shutdown, due to the consumed count exceed the max-consumed [0] at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.close(BaseRichOutputFormat.java:353) at com.dtstack.chunjun.sink.DtOutputFormatSinkFunction.close(DtOutputFormatSinkFunction.java:127) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:797) at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:776) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:691) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:595) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) ... 1 more [CIRCULAR REFERENCE:com.dtstack.chunjun.throwable.NoRestartException: The dirty consumer shutdown, due to the consumed count exceed the max-consumed [0]] 2022-06-10 16:19:51,130 - 187856 INFO [flink-akka.actor.default-dispatcher-37] org.apache.flink.runtime.executiongraph.Execution:Source: oraclelogminersourcefactory (1/1) (7c6b296a8381b7b88c552a1461403cbd) switched from RUNNING to CANCELING. 2022-06-10 16:19:51,130 - 187856 INFO [flink-akka.actor.default-dispatcher-39] org.apache.flink.runtime.taskmanager.Task:Attempting to cancel task Source: oraclelogminersourcefactory (1/1)#0 (7c6b296a8381b7b88c552a1461403cbd). 2022-06-10 16:19:51,130 - 187856 INFO [flink-akka.actor.default-dispatcher-39] org.apache.flink.runtime.taskmanager.Task:Source: oraclelogminersourcefactory (1/1)#0 (7c6b296a8381b7b88c552a1461403cbd) switched from RUNNING to CANCELING. 2022-06-10 16:19:51,131 - 187857 INFO [flink-akka.actor.default-dispatcher-39] org.apache.flink.runtime.taskmanager.Task:Triggering cancellation of task code Source: oraclelogminersourcefactory (1/1)#0 (7c6b296a8381b7b88c552a1461403cbd). 2022-06-10 16:19:51,132 - 187858 WARN [Legacy Source Thread - Source: oraclelogminersourcefactory (1/1)#0] com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerListener:Get data from queue error: java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088) at java.util.concurrent.LinkedBlockingDeque.pollFirst(LinkedBlockingDeque.java:522) at java.util.concurrent.LinkedBlockingDeque.poll(LinkedBlockingDeque.java:684) at com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerListener.getData(LogMinerListener.java:203) at com.dtstack.chunjun.connector.oraclelogminer.inputformat.OracleLogMinerInputFormat.nextRecordInternal(OracleLogMinerInputFormat.java:92) at com.dtstack.chunjun.source.format.BaseRichInputFormat.nextRecord(BaseRichInputFormat.java:197) at com.dtstack.chunjun.source.format.BaseRichInputFormat.nextRecord(BaseRichInputFormat.java:67) at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:133) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267) 2022-06-10 16:19:51,137 - 187863 INFO [Legacy Source Thread - Source: oraclelogminersourcefactory (1/1)#0] com.dtstack.chunjun.dirty.log.LogDirtyDataCollector:Print consumer closed. 2022-06-10 16:19:51,137 - 187863 WARN [Legacy Source Thread - Source: oraclelogminersourcefactory (1/1)#0] com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerConnection:close logMiner failed, e = java.sql.SQLException: 关闭的语句 at oracle.jdbc.driver.OracleClosedStatement.execute(OracleClosedStatement.java:2365) at oracle.jdbc.driver.OracleStatementWrapper.execute(OracleStatementWrapper.java:300) at com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerConnection.disConnect(LogMinerConnection.java:223) at java.lang.Iterable.forEach(Iterable.java:75) at com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerHelper.stop(LogMinerHelper.java:319) at com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerListener.stop(LogMinerListener.java:183) at com.dtstack.chunjun.connector.oraclelogminer.inputformat.OracleLogMinerInputFormat.closeInternal(OracleLogMinerInputFormat.java:104) at com.dtstack.chunjun.source.format.BaseRichInputFormat.close(BaseRichInputFormat.java:216) at com.dtstack.chunjun.source.DtInputFormatSourceFunction.gracefulClose(DtInputFormatSourceFunction.java:170) at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:151) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267)

2022-06-10 16:19:59,938 - 196664 WARN [LogMiner-pool-0] com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerListener:LogMinerListener thread exception: current scn =1159670, e = java.lang.RuntimeException: java.sql.SQLRecoverableException: 关闭的连接 at com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerConnection.startOrUpdateLogMiner(LogMinerConnection.java:290) at com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerHelper.preLoad(LogMinerHelper.java:168) at com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerHelper.chooseAndPreLoadConnection(LogMinerHelper.java:282) at com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerHelper.hasNext(LogMinerHelper.java:251) at com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerListener.run(LogMinerListener.java:141) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.sql.SQLRecoverableException: 关闭的连接 at oracle.jdbc.driver.PhysicalConnection.prepareCall(PhysicalConnection.java:1708) at oracle.jdbc.driver.PhysicalConnection.prepareCall(PhysicalConnection.java:1677) at com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerConnection.resetLogminerStmt(LogMinerConnection.java:1048) at com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerConnection.startOrUpdateLogMiner(LogMinerConnection.java:271) ... 7 more

2022-06-10 16:20:01,942 - 198668 INFO [LogMiner-pool-0] com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerHelper:restart connection, startScn: 1159671,endScn: 1159696 2022-06-10 16:20:01,942 - 198668 WARN [LogMiner-pool-0] com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerConnection:close logMiner failed, e = java.sql.SQLException: 关闭的语句 at oracle.jdbc.driver.OracleClosedStatement.execute(OracleClosedStatement.java:2365) at oracle.jdbc.driver.OracleStatementWrapper.execute(OracleStatementWrapper.java:300) at com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerConnection.disConnect(LogMinerConnection.java:223) at com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerHelper.restart(LogMinerHelper.java:227) at com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerHelper.restart(LogMinerHelper.java:217) at com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerListener.run(LogMinerListener.java:147) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

2022-06-10 16:20:01,942 - 198668 INFO [LogMiner-pool-0] com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerConnection:connection properties is {oracle.jdbc.ReadTimeout=3060000, user=qyj, password=qyj} 2022-06-10 16:20:11,138 - 207864 INFO [Legacy Source Thread - Source: oraclelogminersourcefactory (1/1)#0] com.dtstack.chunjun.source.format.BaseRichInputFormat:subtask input close finished 2022-06-10 16:20:11,138 - 207864 WARN [Source: oraclelogminersourcefactory (1/1)#0] com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerConnection:close logMiner failed, e = java.sql.SQLException: 关闭的语句 at oracle.jdbc.driver.OracleClosedStatement.execute(OracleClosedStatement.java:2365) at oracle.jdbc.driver.OracleStatementWrapper.execute(OracleStatementWrapper.java:300) at com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerConnection.disConnect(LogMinerConnection.java:223) at java.lang.Iterable.forEach(Iterable.java:75) at com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerHelper.stop(LogMinerHelper.java:319) at com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerListener.stop(LogMinerListener.java:183) at com.dtstack.chunjun.connector.oraclelogminer.inputformat.OracleLogMinerInputFormat.closeInternal(OracleLogMinerInputFormat.java:104) at com.dtstack.chunjun.source.format.BaseRichInputFormat.close(BaseRichInputFormat.java:216) at com.dtstack.chunjun.source.DtInputFormatSourceFunction.gracefulClose(DtInputFormatSourceFunction.java:170) at com.dtstack.chunjun.source.DtInputFormatSourceFunction.close(DtInputFormatSourceFunction.java:165) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:797) at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:776) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:691) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:184) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:595) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) at java.lang.Thread.run(Thread.java:750)

2022-06-10 16:20:11,139 - 207865 INFO [Source: oraclelogminersourcefactory (1/1)#0] org.apache.flink.runtime.taskmanager.Task:Source: oraclelogminersourcefactory (1/1)#0 (7c6b296a8381b7b88c552a1461403cbd) switched from CANCELING to CANCELED. 2022-06-10 16:20:11,139 - 207865 INFO [Source: oraclelogminersourcefactory (1/1)#0] org.apache.flink.runtime.taskmanager.Task:Freeing task resources for Source: oraclelogminersourcefactory (1/1)#0 (7c6b296a8381b7b88c552a1461403cbd). 2022-06-10 16:20:11,140 - 207866 INFO [flink-akka.actor.default-dispatcher-37] org.apache.flink.runtime.taskexecutor.TaskExecutor:Un-registering task and sending final execution state CANCELED to JobManager for task Source: oraclelogminersourcefactory (1/1)#0 7c6b296a8381b7b88c552a1461403cbd. 2022-06-10 16:20:11,141 - 207867 INFO [flink-akka.actor.default-dispatcher-43] org.apache.flink.runtime.executiongraph.Execution:Source: oraclelogminersourcefactory (1/1) (7c6b296a8381b7b88c552a1461403cbd) switched from CANCELING to CANCELED. 2022-06-10 16:20:11,143 - 207869 INFO [flink-akka.actor.default-dispatcher-43] org.apache.flink.runtime.executiongraph.ExecutionGraph:Job Flink_Job (9c17ecfce066c8c2cb97f71456318c6e) switched from state FAILING to FAILED. org.apache.flink.runtime.JobException: The failure is not recoverable at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:107) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:517) at akka.actor.Actor.aroundReceive$(Actor.scala:515) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage$$$capture(ActorCell.scala:592) at akka.actor.ActorCell.receiveMessage(ActorCell.scala) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: com.dtstack.chunjun.throwable.NoRestartException: The dirty consumer shutdown, due to the consumed count exceed the max-consumed [0] at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.addConsumed(DirtyDataCollector.java:105) at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.offer(DirtyDataCollector.java:79) at com.dtstack.chunjun.dirty.manager.DirtyManager.collect(DirtyManager.java:140) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeSingleRecord(BaseRichOutputFormat.java:469) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$writeRecordInternal$1(BaseRichOutputFormat.java:487) at java.util.ArrayList.forEach(ArrayList.java:1259) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecordInternal(BaseRichOutputFormat.java:487) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$initTimingSubmitTask$0(BaseRichOutputFormat.java:443) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Suppressed: java.lang.RuntimeException: com.dtstack.chunjun.throwable.NoRestartException: The dirty consumer shutdown, due to the consumed count exceed the max-consumed [0] at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.close(BaseRichOutputFormat.java:353) at com.dtstack.chunjun.sink.DtOutputFormatSinkFunction.close(DtOutputFormatSinkFunction.java:127) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:797) at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:776) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:691) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:595) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) ... 1 more [CIRCULAR REFERENCE:com.dtstack.chunjun.throwable.NoRestartException: The dirty consumer shutdown, due to the consumed count exceed the max-consumed [0]] 2022-06-10 16:20:11,144 - 207870 INFO [flink-akka.actor.default-dispatcher-43] org.apache.flink.runtime.checkpoint.CheckpointCoordinator:Stopping checkpoint coordinator for job 9c17ecfce066c8c2cb97f71456318c6e. 2022-06-10 16:20:11,147 - 207873 INFO [flink-akka.actor.default-dispatcher-43] org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:Shutting down 2022-06-10 16:20:11,151 - 207877 INFO [flink-akka.actor.default-dispatcher-41] org.apache.flink.runtime.dispatcher.Dispatcher:Job 9c17ecfce066c8c2cb97f71456318c6e reached terminal state FAILED. 2022-06-10 16:20:11,151 - 207877 INFO [main] org.apache.flink.runtime.minicluster.MiniCluster:Shutting down Flink Mini Cluster 2022-06-10 16:20:11,152 - 207878 INFO [flink-akka.actor.default-dispatcher-43] org.apache.flink.runtime.taskexecutor.TaskExecutor:Stopping TaskExecutor akka://flink/user/rpc/taskmanager_0. 2022-06-10 16:20:11,152 - 207878 INFO [flink-akka.actor.default-dispatcher-43] org.apache.flink.runtime.taskexecutor.TaskExecutor:Close ResourceManager connection 725425c86a2323f0bde252bf3e5051e4. 2022-06-10 16:20:11,152 - 207878 INFO [main] org.apache.flink.runtime.rest.RestServerEndpoint:Shutting down rest endpoint. 2022-06-10 16:20:11,152 - 207878 INFO [flink-akka.actor.default-dispatcher-37] org.apache.flink.runtime.resourcemanager.ResourceManager:Closing TaskExecutor connection 96d2605c-bca9-4017-90df-fa588ced9841 because: The TaskExecutor is shutting down. 2022-06-10 16:20:11,152 - 207878 INFO [flink-akka.actor.default-dispatcher-42] org.apache.flink.runtime.jobmaster.JobMaster:Stopping the JobMaster for job Flink_Job(9c17ecfce066c8c2cb97f71456318c6e). 2022-06-10 16:20:11,153 - 207879 INFO [flink-akka.actor.default-dispatcher-43] org.apache.flink.runtime.taskexecutor.TaskExecutor:Close JobManager connection for job 9c17ecfce066c8c2cb97f71456318c6e. 2022-06-10 16:20:11,153 - 207879 INFO [flink-akka.actor.default-dispatcher-42] org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl:Suspending SlotPool. 2022-06-10 16:20:11,153 - 207879 INFO [flink-akka.actor.default-dispatcher-42] org.apache.flink.runtime.jobmaster.JobMaster:Close ResourceManager connection 725425c86a2323f0bde252bf3e5051e4: Stopping JobMaster for job Flink_Job(9c17ecfce066c8c2cb97f71456318c6e).. 2022-06-10 16:20:11,153 - 207879 INFO [flink-akka.actor.default-dispatcher-42] org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl:Stopping SlotPool. 2022-06-10 16:20:11,153 - 207879 INFO [flink-akka.actor.default-dispatcher-41] org.apache.flink.runtime.resourcemanager.ResourceManager:Disconnect job manager 94fcfd30e277365db3c45c2d07bb4bd7@akka://flink/user/rpc/jobmanager_3 for job 9c17ecfce066c8c2cb97f71456318c6e from the resource manager. 2022-06-10 16:20:11,155 - 207881 INFO [flink-akka.actor.default-dispatcher-43] org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl:Free slot TaskSlot(index:0, state:ALLOCATED, resource profile: ResourceProfile{taskHeapMemory=1024.000gb (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes), managedMemory=512.000mb (536870912 bytes), networkMemory=64.000mb (67108864 bytes)}, allocationId: 94a81a902fbd9e9efafeb7aaf26b2133, jobId: 9c17ecfce066c8c2cb97f71456318c6e). 2022-06-10 16:20:11,167 - 207893 INFO [mini-cluster-io-thread-19] org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl:JobManager for job 9c17ecfce066c8c2cb97f71456318c6e with leader id 94fcfd30e277365db3c45c2d07bb4bd7 lost leadership. 2022-06-10 16:20:11,169 - 207895 INFO [flink-akka.actor.default-dispatcher-43] org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService:Stop job leader service. 2022-06-10 16:20:11,169 - 207895 INFO [flink-akka.actor.default-dispatcher-43] org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:Shutting down TaskExecutorLocalStateStoresManager. 2022-06-10 16:20:11,169 - 207895 INFO [ForkJoinPool.commonPool-worker-8] org.apache.flink.runtime.webmonitor.WebMonitorEndpoint:Removing cache directory C:\Users\thinkpad\AppData\Local\Temp\flink-web-ui 2022-06-10 16:20:11,170 - 207896 INFO [ForkJoinPool.commonPool-worker-8] org.apache.flink.runtime.rest.RestServerEndpoint:Shut down complete. 2022-06-10 16:20:11,172 - 207898 INFO [flink-akka.actor.default-dispatcher-43] org.apache.flink.runtime.io.disk.FileChannelManagerImpl:FileChannelManager removed spill file directory C:\Users\thinkpad\AppData\Local\Temp\flink-io-2adabdda-5bff-469f-9429-3ece499df45e 2022-06-10 16:20:11,172 - 207898 INFO [flink-akka.actor.default-dispatcher-42] org.apache.flink.runtime.resourcemanager.ResourceManager:Shut down cluster because application is in CANCELED, diagnostics DispatcherResourceManagerComponent has been closed.. 2022-06-10 16:20:11,172 - 207898 INFO [flink-akka.actor.default-dispatcher-43] org.apache.flink.runtime.io.network.NettyShuffleEnvironment:Shutting down the network environment and its components. 2022-06-10 16:20:11,172 - 207898 INFO [flink-akka.actor.default-dispatcher-42] org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent:Closing components. 2022-06-10 16:20:11,173 - 207899 INFO [flink-akka.actor.default-dispatcher-42] org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess:Stopping SessionDispatcherLeaderProcess. 2022-06-10 16:20:11,174 - 207900 INFO [flink-akka.actor.default-dispatcher-41] org.apache.flink.runtime.dispatcher.Dispatcher:Stopping dispatcher akka://flink/user/rpc/dispatcher_2. 2022-06-10 16:20:11,174 - 207900 INFO [flink-akka.actor.default-dispatcher-41] org.apache.flink.runtime.dispatcher.Dispatcher:Stopping all currently running jobs of dispatcher akka://flink/user/rpc/dispatcher_2. 2022-06-10 16:20:11,174 - 207900 INFO [flink-akka.actor.default-dispatcher-43] org.apache.flink.runtime.io.disk.FileChannelManagerImpl:FileChannelManager removed spill file directory C:\Users\thinkpad\AppData\Local\Temp\flink-netty-shuffle-6819afe9-298a-4ad4-b615-409a98fedd9b 2022-06-10 16:20:11,174 - 207900 INFO [flink-akka.actor.default-dispatcher-37] org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl:Closing the SlotManager. 2022-06-10 16:20:11,174 - 207900 INFO [flink-akka.actor.default-dispatcher-43] org.apache.flink.runtime.taskexecutor.KvStateService:Shutting down the kvState service and its components. 2022-06-10 16:20:11,174 - 207900 INFO [flink-akka.actor.default-dispatcher-41] org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureRequestCoordinator:Shutting down back pressure request coordinator. 2022-06-10 16:20:11,174 - 207900 INFO [flink-akka.actor.default-dispatcher-37] org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl:Suspending the SlotManager. 2022-06-10 16:20:11,175 - 207901 INFO [flink-akka.actor.default-dispatcher-43] org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService:Stop job leader service. 2022-06-10 16:20:11,175 - 207901 INFO [flink-akka.actor.default-dispatcher-41] org.apache.flink.runtime.dispatcher.Dispatcher:Stopped dispatcher akka://flink/user/rpc/dispatcher_2. 2022-06-10 16:20:11,176 - 207902 INFO [flink-akka.actor.default-dispatcher-43] org.apache.flink.runtime.filecache.FileCache:removed file cache directory C:\Users\thinkpad\AppData\Local\Temp\flink-dist-cache-366ff838-9b5d-44c4-9465-74aed1243d1f 2022-06-10 16:20:11,177 - 207903 INFO [flink-akka.actor.default-dispatcher-43] org.apache.flink.runtime.taskexecutor.TaskExecutor:Stopped TaskExecutor akka://flink/user/rpc/taskmanager_0. 2022-06-10 16:20:11,178 - 207904 INFO [AkkaRpcService-Supervisor-Termination-Future-Executor-thread-1] org.apache.flink.runtime.rpc.akka.AkkaRpcService:Stopping Akka RPC service. 2022-06-10 16:20:11,209 - 207935 INFO [flink-metrics-4] org.apache.flink.runtime.rpc.akka.AkkaRpcService:Stopping Akka RPC service. 2022-06-10 16:20:11,210 - 207936 INFO [flink-metrics-4] org.apache.flink.runtime.rpc.akka.AkkaRpcService:Stopped Akka RPC service. 2022-06-10 16:20:11,222 - 207948 INFO [flink-akka.actor.default-dispatcher-37] org.apache.flink.runtime.blob.AbstractBlobCache:Shutting down BLOB cache 2022-06-10 16:20:11,223 - 207949 INFO [flink-akka.actor.default-dispatcher-37] org.apache.flink.runtime.blob.AbstractBlobCache:Shutting down BLOB cache 2022-06-10 16:20:11,225 - 207951 INFO [flink-akka.actor.default-dispatcher-37] org.apache.flink.runtime.blob.BlobServer:Stopped BLOB server at 0.0.0.0:63128 2022-06-10 16:20:11,225 - 207951 INFO [flink-akka.actor.default-dispatcher-37] org.apache.flink.runtime.rpc.akka.AkkaRpcService:Stopped Akka RPC service. Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:811) at com.dtstack.chunjun.environment.MyLocalStreamEnvironment.execute(MyLocalStreamEnvironment.java:174) at com.dtstack.chunjun.Main.exeSyncJob(Main.java:227) at com.dtstack.chunjun.Main.main(Main.java:122) at com.dtstack.chunjun.local.test.LocalTest.main(LocalTest.java:178) Caused by: org.apache.flink.runtime.JobException: The failure is not recoverable at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:107) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:517) at akka.actor.Actor.aroundReceive$(Actor.scala:515) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage$$$capture(ActorCell.scala:592) at akka.actor.ActorCell.receiveMessage(ActorCell.scala) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: com.dtstack.chunjun.throwable.NoRestartException: The dirty consumer shutdown, due to the consumed count exceed the max-consumed [0] at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.addConsumed(DirtyDataCollector.java:105) at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.offer(DirtyDataCollector.java:79) at com.dtstack.chunjun.dirty.manager.DirtyManager.collect(DirtyManager.java:140) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeSingleRecord(BaseRichOutputFormat.java:469) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$writeRecordInternal$1(BaseRichOutputFormat.java:487) at java.util.ArrayList.forEach(ArrayList.java:1259) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecordInternal(BaseRichOutputFormat.java:487) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$initTimingSubmitTask$0(BaseRichOutputFormat.java:443) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Suppressed: java.lang.RuntimeException: com.dtstack.chunjun.throwable.NoRestartException: The dirty consumer shutdown, due to the consumed count exceed the max-consumed [0] at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.close(BaseRichOutputFormat.java:353) at com.dtstack.chunjun.sink.DtOutputFormatSinkFunction.close(DtOutputFormatSinkFunction.java:127) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:797) at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:776) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:691) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:595) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) ... 1 more [CIRCULAR REFERENCE:com.dtstack.chunjun.throwable.NoRestartException: The dirty consumer shutdown, due to the consumed count exceed the max-consumed [0]] 2022-06-10 16:20:11,978 - 208704 INFO [LogMiner-pool-0] com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerConnection:oracle info OracleInfo{version=11, isRacMode=false, isCdbMode=false, encoding='SIMPLIFIED CHINESE_CHINA.AL32UTF8'} 2022-06-10 16:20:11,979 - 208705 INFO [LogMiner-pool-0] com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerConnection:get connection successfully, url:jdbc:oracle:thin:@192.168.147.128:1521:helowin, username:qyj, Oracle info:OracleInfo{version=11, isRacMode=false, isCdbMode=false, encoding='SIMPLIFIED CHINESE_CHINA.AL32UTF8'} 2022-06-10 16:20:11,983 - 208709 INFO [LogMiner-pool-0] com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerConnection:Log group changed, startScn = 1159671,endScn = 1159696 new log group = [ { "fileName": "/home/oracle/app/oracle/oradata/helowin/redo02.log", "firstChange": 1148842, "nextChange": 281474976710655, "thread": 1, "status": 0, "type": "ONLINE", "bytes": 0 } ] 2022-06-10 16:20:11,983 - 208709 WARN [LogMiner-pool-0] com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerListener:LogMinerListener run failed, Throwable = java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@18b7443 rejected from java.util.concurrent.ThreadPoolExecutor@25196217[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 16] at com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerHelper.restart(LogMinerHelper.java:244) at com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerHelper.restart(LogMinerHelper.java:217) at com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerListener.run(LogMinerListener.java:147) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@18b7443 rejected from java.util.concurrent.ThreadPoolExecutor@25196217[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 16] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) at com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerHelper.loadData(LogMinerHelper.java:203) at com.dtstack.chunjun.connector.oraclelogminer.listener.LogMinerHelper.restart(LogMinerHelper.java:242) ... 5 more

Exception: java.util.concurrent.RejectedExecutionException thrown from the UncaughtExceptionHandler in thread "LogMiner-pool-0" 与目标 VM 断开连接, 地址为: ''127.0.0.1:63119',传输: '套接字''

进程已结束,退出代码1

EngonVHKxZ commented 2 years ago
{
  "job": {
    "content": [
      {
        "reader": {
          "parameter": {
            "jdbcUrl": "jdbc:oracle:thin:@192.168.174.128:1521:helowin",
            "username": "roma_logminer",
            "password": "roma_logminer",
            "supportAutoAddLog": false,
            "table": ["ROMA_LOGMINER.LOGMINER_FROM"],
            "cat": "INSERT,UPDATE,DELETE",
            "split" : true,
            "readPosition" : "current",
            "queryTimeout": 3000
          },
          "name": "oraclelogminerreader"
        },
        "writer": {
          "parameter": {
            "writeMode":"update",
            "updateKey": {
              "key": ["ID"]
            },
            "allReplace": true,
            "username": "roma_logminer",
            "password": "roma_logminer",
            "connection": [{
              "jdbcUrl": "jdbc:oracle:thin:@192.168.174.128:1521:helowin",
              "table": ["ROMA_LOGMINER.LOGMINER_TO"]
            }],
            "column": [
              {
                "name": "ID",
                "type": "string"
              },
              {
                "name": "NAME",
                "type": "string"
              }
            ]
          },
          "name": "oraclewriter"
        },
        "nameMapping":{
          "schemaMappings":{
            "ROMA_LOGMINER":"ROMA_LOGMINER"
          },
          "tableMappings":{
            "ROMA_LOGMINER":{
              "LOGMINER_FROM": "LOGMINER_TO"
            }
          },
          "fieldMappings":{
            "ROMA_LOGMINER":{
              "LOGMINER_FROM":{
                "ID": "ID",
                "NAME": "NAME"
              }
            }
          }
        }
      }
    ],
    "setting" : {
      "speed" : {
        "bytes" : 0,
        "channel" : 1
      }
    }
  }
}
queyijun commented 2 years ago

这份配置文件可以执行单表增删改, 内容如下:

{
  "job": {
    "content": [
      {
        "reader": {
          "parameter": {
            "jdbcUrl": "jdbc:oracle:thin:@192.168.147.128:1521:helowin",
            "username": "qyj",
            "password": "qyj",
            "supportAutoAddLog": false,
            "table": ["QYJ.test_01"],
            "cat": "UPDATE,INSERT,DELETE",
            "split":true,
            "readPosition": "CURRENT",
            "queryTimeout": 3000
          },
          "name": "oraclelogminerreader"
        },
        "writer": {
          "parameter": {
            "writeMode": "update",
            "updateKey": {
              "key": ["id"]
            },
            "allReplace": true,
            "username": "qyj",
            "password": "qyj",
            "connection": [{
                "schema": "QYJ",
                "jdbcUrl": "jdbc:oracle:thin:@192.168.147.128:1521:helowin",
                "table": ["test_02"]
            }],
            "column": [
              {
                "name": "id",
                "type": "string"
              },
              {
                "name": "name",
                "type": "string"
              }
            ]

          },
          "name": "oraclewriter"
        },
        "nameMapping":{
          "schemaMappings":{
            "QYJ":"QYJ"
          },
          "tableMappings":{
            "QYJ":{
              "test_01": "test_02"
            }
          },
          "fieldMappings":{
            "QYJ":{
              "test_02":{
                "ID": "ID",
                "NAME": "NAME"
              }
            }
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": 0
      },
      "errorLimit": {
        "record": 1
      },
      "restore": {
        "maxRowNumForCheckpoint": 0,
        "isRestore": false,
        "restoreColumnName": "",
        "restoreColumnIndex": 0
      },
      "log": {
        "isLogger": true,
        "level": "debug",
        "path": "",
        "pattern": ""
      }
    }
  }
}