Open frank451209123 opened 2 years ago
flink taskmanger日志信息见下: 2022-09-08 18:09:49,974 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - -------------------------------------------------------------------------------- 2022-09-08 18:09:49,977 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Preconfiguration: 2022-09-08 18:09:49,977 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] -
TM_RESOURCE_PARAMS extraction logs: jvm_params: -Xmx536870902 -Xms536870902 -XX:MaxDirectMemorySize=268435458 -XX:MaxMetaspaceSize=268435456 dynamic_configs: -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=134217730b -D taskmanager.memory.network.min=134217730b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=536870920b -D taskmanager.cpu.cores=4.0 -D taskmanager.memory.task.heap.size=402653174b -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D taskmanager.memory.jvm-overhead.max=201326592b -D taskmanager.memory.jvm-overhead.min=201326592b logs: INFO [] - Loading configuration property: jobmanager.rpc.address, localhost INFO [] - Loading configuration property: jobmanager.rpc.port, 6123 INFO [] - Loading configuration property: jobmanager.memory.process.size, 1600m INFO [] - Loading configuration property: taskmanager.memory.process.size, 1728m INFO [] - Loading configuration property: taskmanager.numberOfTaskSlots, 4 INFO [] - Loading configuration property: parallelism.default, 1 INFO [] - Loading configuration property: jobmanager.execution.failover-strategy, region INFO [] - The derived from fraction jvm overhead memory (172.800mb (181193935 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead INFO [] - Final TaskExecutor Memory configuration: INFO [] - Total Process Memory: 1.688gb (1811939328 bytes) INFO [] - Total Flink Memory: 1.250gb (1342177280 bytes) INFO [] - Total JVM Heap Memory: 512.000mb (536870902 bytes) INFO [] - Framework: 128.000mb (134217728 bytes) INFO [] - Task: 384.000mb (402653174 bytes) INFO [] - Total Off-heap Memory: 768.000mb (805306378 bytes) INFO [] - Managed: 512.000mb (536870920 bytes) INFO [] - Total JVM Direct Memory: 256.000mb (268435458 bytes) INFO [] - Framework: 128.000mb (134217728 bytes) INFO [] - Task: 0 bytes INFO [] - Network: 128.000mb (134217730 bytes) INFO [] - JVM Metaspace: 256.000mb (268435456 bytes) INFO [] - JVM Overhead: 192.000mb (201326592 bytes)
2022-09-08 18:09:49,977 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - --------------------------------------------------------------------------------
2022-09-08 18:09:49,977 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Starting TaskManager (Version: 1.12.7, Scala: 2.12, Rev:88d9950, Date:2021-12-14T23:39:33+01:00)
2022-09-08 18:09:49,977 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - OS current user: root
2022-09-08 18:09:49,978 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Current Hadoop/Kerberos user:
2022-09-08 19:26:11,758 WARN com.dtstack.chunjun.dirty.log.LogDirtyDataCollector [] - ====================Dirty Data===================== DirtyDataEntry[jobId='47dcb1c312ebbbd369e3e2c3a4d92c05', jobName='sqlserverCDCToPostgresql', operatorName='Source: TableSourceScan(table=[[default_catalog, default_database, source]], fie', dirtyContent='null', errorMessage='com.dtstack.chunjun.throwable.ReadRecordException: takeEvent interrupted error java.lang.InterruptedException at com.dtstack.chunjun.connector.sqlservercdc.inputFormat.SqlServerCdcInputFormat.nextRecordInternal(SqlServerCdcInputFormat.java:130) at com.dtstack.chunjun.source.format.BaseRichInputFormat.nextRecord(BaseRichInputFormat.java:197) at com.dtstack.chunjun.source.format.BaseRichInputFormat.nextRecord(BaseRichInputFormat.java:67) at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:133) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267) Caused by: java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088) at java.util.concurrent.LinkedBlockingDeque.pollFirst(LinkedBlockingDeque.java:522) at java.util.concurrent.LinkedBlockingDeque.poll(LinkedBlockingDeque.java:684) at com.dtstack.chunjun.connector.sqlservercdc.inputFormat.SqlServerCdcInputFormat.nextRecordInternal(SqlServerCdcInputFormat.java:127) ... 6 more ', fieldName='null', createTime=2022-09-08 19:26:11.749]
=================================================== 2022-09-08 19:26:11,757 ERROR com.dtstack.chunjun.source.DtInputFormatSourceFunction [] - Exception happened, start to close format com.dtstack.chunjun.throwable.NoRestartException: The dirty consumer shutdown, due to the consumed count exceed the max-consumed [0] at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.addConsumed(DirtyDataCollector.java:105) ~[blob_p-1e5f7cb8f16ee2b15f49200098e655906da5b1d4-c118eff94db89f8129729eb499dd7625:?] at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.offer(DirtyDataCollector.java:79) ~[blob_p-1e5f7cb8f16ee2b15f49200098e655906da5b1d4-c118eff94db89f8129729eb499dd7625:?] at com.dtstack.chunjun.dirty.manager.DirtyManager.collect(DirtyManager.java:140) ~[blob_p-1e5f7cb8f16ee2b15f49200098e655906da5b1d4-c118eff94db89f8129729eb499dd7625:?] at com.dtstack.chunjun.source.format.BaseRichInputFormat.nextRecord(BaseRichInputFormat.java:199) ~[blob_p-1e5f7cb8f16ee2b15f49200098e655906da5b1d4-c118eff94db89f8129729eb499dd7625:?] at com.dtstack.chunjun.source.format.BaseRichInputFormat.nextRecord(BaseRichInputFormat.java:67) ~[blob_p-1e5f7cb8f16ee2b15f49200098e655906da5b1d4-c118eff94db89f8129729eb499dd7625:?] at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:133) [blob_p-1e5f7cb8f16ee2b15f49200098e655906da5b1d4-c118eff94db89f8129729eb499dd7625:?] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) [flink-dist_2.12-1.12.7.jar:1.12.7] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) [flink-dist_2.12-1.12.7.jar:1.12.7] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267) [flink-dist_2.12-1.12.7.jar:1.12.7] 2022-09-08 19:26:11,763 WARN com.dtstack.chunjun.connector.sqlservercdc.inputFormat.SqlServerCdcInputFormat [] - shutdown SqlServerCdcListener...... 2022-09-08 19:26:11,763 INFO com.dtstack.chunjun.dirty.log.LogDirtyDataCollector [] - Print consumer closed. 2022-09-08 19:26:31,764 INFO com.dtstack.chunjun.connector.sqlservercdc.inputFormat.SqlServerCdcInputFormat [] - subtask input close finished 2022-09-08 19:26:31,773 INFO com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat [] - taskNumber[0] close() 2022-09-08 19:26:51,775 INFO com.dtstack.chunjun.dirty.log.LogDirtyDataCollector [] - Print consumer closed. 2022-09-08 19:26:51,775 INFO com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat [] - subtask[0}] close() finished 2022-09-08 19:26:51,785 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, parent_id, project_no, sales_order, plant_code, product_line, sales_order_item, bay_id, order_no, order_status, order_level, sap_order_status, material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, update_time]) -> Calc(select=[CAST(id) AS id, CAST(parent_id) AS parent_id, CAST(project_no) AS project_no, CAST(sales_order) AS sales_order, plant_code, product_line, CAST(sales_order_item) AS sales_order_item, bay_id, CAST(order_no) AS order_no, order_status, CAST(order_level) AS order_level, sap_order_status, CAST(material_no) AS material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, CAST(update_time) AS update_time]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[id, parent_id, project_no, sales_order, plant_code, product_line, sales_order_item, bay_id, order_no, order_status, order_level, sap_order_status, material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, update_time]) (1/1)#0 (be99fd45957068b61784e457308cf99e) switched from CANCELING to CANCELED. 2022-09-08 19:26:51,785 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, parent_id, project_no, sales_order, plant_code, product_line, sales_order_item, bay_id, order_no, order_status, order_level, sap_order_status, material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, update_time]) -> Calc(select=[CAST(id) AS id, CAST(parent_id) AS parent_id, CAST(project_no) AS project_no, CAST(sales_order) AS sales_order, plant_code, product_line, CAST(sales_order_item) AS sales_order_item, bay_id, CAST(order_no) AS order_no, order_status, CAST(order_level) AS order_level, sap_order_status, CAST(material_no) AS material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, CAST(update_time) AS update_time]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[id, parent_id, project_no, sales_order, plant_code, product_line, sales_order_item, bay_id, order_no, order_status, order_level, sap_order_status, material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, update_time]) (1/1)#0 (be99fd45957068b61784e457308cf99e). 2022-09-08 19:26:51,790 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state CANCELED to JobManager for task Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, parent_id, project_no, sales_order, plant_code, product_line, sales_order_item, bay_id, order_no, order_status, order_level, sap_order_status, material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, update_time]) -> Calc(select=[CAST(id) AS id, CAST(parent_id) AS parent_id, CAST(project_no) AS project_no, CAST(sales_order) AS sales_order, plant_code, product_line, CAST(sales_order_item) AS sales_order_item, bay_id, CAST(order_no) AS order_no, order_status, CAST(order_level) AS order_level, sap_order_status, CAST(material_no) AS material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, CAST(update_time) AS update_time]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[id, parent_id, project_no, sales_order, plant_code, product_line, sales_order_item, bay_id, order_no, order_status, order_level, sap_order_status, material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, update_time]) (1/1)#0 be99fd45957068b61784e457308cf99e. 2022-09-08 19:26:51,853 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=96.000mb (100663293 bytes), taskOffHeapMemory=0 bytes, managedMemory=128.000mb (134217730 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationId: d7d358ae1ec9a0ef253dff422887843a, jobId: 47dcb1c312ebbbd369e3e2c3a4d92c05). 2022-09-08 19:26:51,856 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 47dcb1c312ebbbd369e3e2c3a4d92c05 from job leader monitoring. 2022-09-08 19:26:51,857 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job 47dcb1c312ebbbd369e3e2c3a4d92c05. 2022-09-08 19:27:03,888 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Receive slot request 47efabd84e62dba67f8ed58327637411 for job e1f46d26224b42aa18b94370510e5cd3 from resource manager with leader id 00000000000000000000000000000000. 2022-09-08 19:27:03,889 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Allocated slot for 47efabd84e62dba67f8ed58327637411. 2022-09-08 19:27:03,889 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job e1f46d26224b42aa18b94370510e5cd3 for job leader monitoring. 2022-09-08 19:27:03,889 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to register at job manager akka.tcp://flink@localhost:6123/user/rpc/jobmanager_3 with leader id 00000000-0000-0000-0000-000000000000. 2022-09-08 19:27:03,896 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved JobManager address, beginning registration 2022-09-08 19:27:03,904 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful registration at job manager akka.tcp://flink@localhost:6123/user/rpc/jobmanager_3 for job e1f46d26224b42aa18b94370510e5cd3. 2022-09-08 19:27:03,904 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Establish JobManager connection for job e1f46d26224b42aa18b94370510e5cd3. 2022-09-08 19:27:03,904 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Offer reserved slots to the leader of job e1f46d26224b42aa18b94370510e5cd3. 2022-09-08 19:27:03,910 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 47efabd84e62dba67f8ed58327637411. 2022-09-08 19:27:03,915 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 47efabd84e62dba67f8ed58327637411. 2022-09-08 19:27:03,916 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, parent_id, project_no, sales_order, plant_code, product_line, sales_order_item, bay_id, order_no, order_status, order_level, sap_order_status, material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, update_time]) -> Calc(select=[CAST(id) AS id, CAST(parent_id) AS parent_id, CAST(project_no) AS project_no, CAST(sales_order) AS sales_order, plant_code, product_line, CAST(sales_order_item) AS sales_order_item, bay_id, CAST(order_no) AS order_no, order_status, CAST(order_level) AS order_level, sap_order_status, CAST(material_no) AS material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, CAST(update_time) AS update_time]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[id, parent_id, project_no, sales_order, plant_code, product_line, sales_order_item, bay_id, order_no, order_status, order_level, sap_order_status, material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, update_time]) (1/1)#0 (842d4576a79911446754e60eda36d2be), deploy into slot with allocation id 47efabd84e62dba67f8ed58327637411. 2022-09-08 19:27:03,917 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, parent_id, project_no, sales_order, plant_code, product_line, sales_order_item, bay_id, order_no, order_status, order_level, sap_order_status, material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, update_time]) -> Calc(select=[CAST(id) AS id, CAST(parent_id) AS parent_id, CAST(project_no) AS project_no, CAST(sales_order) AS sales_order, plant_code, product_line, CAST(sales_order_item) AS sales_order_item, bay_id, CAST(order_no) AS order_no, order_status, CAST(order_level) AS order_level, sap_order_status, CAST(material_no) AS material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, CAST(update_time) AS update_time]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[id, parent_id, project_no, sales_order, plant_code, product_line, sales_order_item, bay_id, order_no, order_status, order_level, sap_order_status, material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, update_time]) (1/1)#0 (842d4576a79911446754e60eda36d2be) switched from CREATED to DEPLOYING. 2022-09-08 19:27:03,918 INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, parent_id, project_no, sales_order, plant_code, product_line, sales_order_item, bay_id, order_no, order_status, order_level, sap_order_status, material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, update_time]) -> Calc(select=[CAST(id) AS id, CAST(parent_id) AS parent_id, CAST(project_no) AS project_no, CAST(sales_order) AS sales_order, plant_code, product_line, CAST(sales_order_item) AS sales_order_item, bay_id, CAST(order_no) AS order_no, order_status, CAST(order_level) AS order_level, sap_order_status, CAST(material_no) AS material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, CAST(update_time) AS update_time]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[id, parent_id, project_no, sales_order, plant_code, product_line, sales_order_item, bay_id, order_no, order_status, order_level, sap_order_status, material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, update_time]) (1/1)#0 (842d4576a79911446754e60eda36d2be) [DEPLOYING]. 2022-09-08 19:27:03,918 INFO org.apache.flink.runtime.blob.BlobClient [] - Downloading e1f46d26224b42aa18b94370510e5cd3/p-1e5f7cb8f16ee2b15f49200098e655906da5b1d4-9e5182b22a4d5cb6a6815ecfa4d21796 from localhost/127.0.0.1:34063 2022-09-08 19:27:03,980 INFO org.apache.flink.runtime.taskmanager.Task [] - Registering task at network: Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, parent_id, project_no, sales_order, plant_code, product_line, sales_order_item, bay_id, order_no, order_status, order_level, sap_order_status, material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, update_time]) -> Calc(select=[CAST(id) AS id, CAST(parent_id) AS parent_id, CAST(project_no) AS project_no, CAST(sales_order) AS sales_order, plant_code, product_line, CAST(sales_order_item) AS sales_order_item, bay_id, CAST(order_no) AS order_no, order_status, CAST(order_level) AS order_level, sap_order_status, CAST(material_no) AS material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, CAST(update_time) AS update_time]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[id, parent_id, project_no, sales_order, plant_code, product_line, sales_order_item, bay_id, order_no, order_status, order_level, sap_order_status, material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, update_time]) (1/1)#0 (842d4576a79911446754e60eda36d2be) [DEPLOYING]. 2022-09-08 19:27:03,981 INFO org.apache.flink.runtime.taskmanager.Task [] - Obtaining local cache file for 'class_path_2'. 2022-09-08 19:27:03,981 INFO org.apache.flink.runtime.taskmanager.Task [] - Obtaining local cache file for 'class_path_1'. 2022-09-08 19:27:03,982 INFO org.apache.flink.runtime.taskmanager.Task [] - Obtaining local cache file for 'class_path_0'. 2022-09-08 19:27:03,982 INFO org.apache.flink.runtime.blob.BlobClient [] - Downloading e1f46d26224b42aa18b94370510e5cd3/p-e7c57bc8a3d7173eab68d17eb9a58f9cd303e6e7-cc002bf7f965c5cd9385c93c518c2347 from localhost/127.0.0.1:34063 2022-09-08 19:27:03,982 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 2022-09-08 19:27:03,982 INFO org.apache.flink.runtime.blob.BlobClient [] - Downloading e1f46d26224b42aa18b94370510e5cd3/p-cd2f8171f8678a059d71abd099b5a343013c93b3-1ef11b219d8865157948395948db6a6e from localhost/127.0.0.1:34063 2022-09-08 19:27:03,982 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, parent_id, project_no, sales_order, plant_code, product_line, sales_order_item, bay_id, order_no, order_status, order_level, sap_order_status, material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, update_time]) -> Calc(select=[CAST(id) AS id, CAST(parent_id) AS parent_id, CAST(project_no) AS project_no, CAST(sales_order) AS sales_order, plant_code, product_line, CAST(sales_order_item) AS sales_order_item, bay_id, CAST(order_no) AS order_no, order_status, CAST(order_level) AS order_level, sap_order_status, CAST(material_no) AS material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, CAST(update_time) AS update_time]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[id, parent_id, project_no, sales_order, plant_code, product_line, sales_order_item, bay_id, order_no, order_status, order_level, sap_order_status, material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, update_time]) (1/1)#0 (842d4576a79911446754e60eda36d2be) switched from DEPLOYING to RUNNING. 2022-09-08 19:27:03,982 INFO org.apache.flink.runtime.blob.BlobClient [] - Downloading e1f46d26224b42aa18b94370510e5cd3/p-aa9a45374ffd87171858a620e49c29157ab3a313-c8e075564c4bd1c52d36f58e1dc59627 from localhost/127.0.0.1:34063 2022-09-08 19:27:04,047 WARN org.apache.flink.metrics.MetricGroup [] - The operator name Sink: Sink(table=[default_catalog.default_database.sink], fields=[id, parent_id, project_no, sales_order, plant_code, product_line, sales_order_item, bay_id, order_no, order_status, order_level, sap_order_status, material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, update_time]) exceeded the 80 characters length limit and was truncated. 2022-09-08 19:27:04,118 WARN org.apache.flink.metrics.MetricGroup [] - The operator name Calc(select=[CAST(id) AS id, CAST(parent_id) AS parent_id, CAST(project_no) AS project_no, CAST(sales_order) AS sales_order, plant_code, product_line, CAST(sales_order_item) AS sales_order_item, bay_id, CAST(order_no) AS order_no, order_status, CAST(order_level) AS order_level, sap_order_status, CAST(material_no) AS material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, CAST(update_time) AS update_time]) exceeded the 80 characters length limit and was truncated. 2022-09-08 19:27:04,120 WARN org.apache.flink.metrics.MetricGroup [] - The operator name Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, parent_id, project_no, sales_order, plant_code, product_line, sales_order_item, bay_id, order_no, order_status, order_level, sap_order_status, material_no, material_descr, final_material, init_production_center, final_production_center, module_code, material_no_fellow, order_no_fellow, item_component_list, reservation, reservation_item, material_picking1_times, material_picking2_times, first_date, basic_start_time, basic_finish_time, product_start_time, mes_plan_start_time, mes_plan_end_time, mes_task_exec_time, mes_plan_u_time, is_deleted, is_modify, create_user, update_user, create_time, update_time]) exceeded the 80 characters length limit and was truncated. 2022-09-08 19:27:04,120 INFO com.dtstack.chunjun.sink.DtOutputFormatSinkFunction [] - Start initialize output format state 2022-09-08 19:27:04,123 INFO com.dtstack.chunjun.sink.DtOutputFormatSinkFunction [] - Is restored:false 2022-09-08 19:27:04,123 INFO com.dtstack.chunjun.sink.DtOutputFormatSinkFunction [] - End initialize output format state 2022-09-08 19:27:04,305 INFO com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat [] - initTimingSubmitTask() ,initialDelay:1000, delay:1000, MILLISECONDS 2022-09-08 19:27:04,457 INFO com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat [] - write sql:INSERT INTO "siemens2_ETO_PDM_Header"("id", "parent_id", "project_no", "sales_order", "plant_code", "product_line", "sales_order_item", "bay_id", "order_no", "order_status", "order_level", "sap_order_status", "material_no", "material_descr", "final_material", "init_production_center", "final_production_center", "module_code", "material_no_fellow", "order_no_fellow", "item_component_list", "reservation", "reservation_item", "material_picking1_times", "material_picking2_times", "first_date", "basic_start_time", "basic_finish_time", "product_start_time", "mes_plan_start_time", "mes_plan_end_time", "mes_task_exec_time", "mes_plan_u_time", "is_deleted", "is_modify", "create_user", "update_user", "create_time", "update_time") VALUES (:id, :parent_id, :project_no, :sales_order, :plant_code, :product_line, :sales_order_item, :bay_id, :order_no, :order_status, :order_level, :sap_order_status, :material_no, :material_descr, :final_material, :init_production_center, :final_production_center, :module_code, :material_no_fellow, :order_no_fellow, :item_component_list, :reservation, :reservation_item, :material_picking1_times, :material_picking2_times, :first_date, :basic_start_time, :basic_finish_time, :product_start_time, :mes_plan_start_time, :mes_plan_end_time, :mes_task_exec_time, :mes_plan_u_time, :is_deleted, :is_modify, :create_user, :update_user, :create_time, :update_time) ON CONFLICT ("id") DO UPDATE SET "parent_id"=EXCLUDED."parent_id", "project_no"=EXCLUDED."project_no", "sales_order"=EXCLUDED."sales_order", "plant_code"=EXCLUDED."plant_code", "product_line"=EXCLUDED."product_line", "sales_order_item"=EXCLUDED."sales_order_item", "bay_id"=EXCLUDED."bay_id", "order_no"=EXCLUDED."order_no", "order_status"=EXCLUDED."order_status", "order_level"=EXCLUDED."order_level", "sap_order_status"=EXCLUDED."sap_order_status", "material_no"=EXCLUDED."material_no", "material_descr"=EXCLUDED."material_descr", "final_material"=EXCLUDED."final_material", "init_production_center"=EXCLUDED."init_production_center", "final_production_center"=EXCLUDED."final_production_center", "module_code"=EXCLUDED."module_code", "material_no_fellow"=EXCLUDED."material_no_fellow", "order_no_fellow"=EXCLUDED."order_no_fellow", "item_component_list"=EXCLUDED."item_component_list", "reservation"=EXCLUDED."reservation", "reservation_item"=EXCLUDED."reservation_item", "material_picking1_times"=EXCLUDED."material_picking1_times", "material_picking2_times"=EXCLUDED."material_picking2_times", "first_date"=EXCLUDED."first_date", "basic_start_time"=EXCLUDED."basic_start_time", "basic_finish_time"=EXCLUDED."basic_finish_time", "product_start_time"=EXCLUDED."product_start_time", "mes_plan_start_time"=EXCLUDED."mes_plan_start_time", "mes_plan_end_time"=EXCLUDED."mes_plan_end_time", "mes_task_exec_time"=EXCLUDED."mes_task_exec_time", "mes_plan_u_time"=EXCLUDED."mes_plan_u_time", "is_deleted"=EXCLUDED."is_deleted", "is_modify"=EXCLUDED."is_modify", "create_user"=EXCLUDED."create_user", "update_user"=EXCLUDED."update_user", "create_time"=EXCLUDED."create_time", "update_time"=EXCLUDED."update_time" 2022-09-08 19:27:04,487 INFO com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat [] - subTask[0}] wait finished 2022-09-08 19:27:04,517 INFO com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat [] - [JdbcOutputFormat] open successfully, checkpointMode = AT_LEAST_ONCE, checkpointEnabled = false, flushIntervalMills = 1000, batchSize = 1024, [JdbcConf]: { "semantic" : "at-least-once", "errorRecord" : 0, "checkFormat" : true, "parallelism" : 1, "executeDdlAble" : false, "pollingInterval" : 5000, "increment" : false, "flushIntervalMills" : 1000, "polling" : false, "mode" : "UPDATE", "password" : "**", "restoreColumnIndex" : -1, "connection" : [ { "table" : [ "siemens2_ETO_PDM_Header" ], "jdbcUrl" : "jdbc:postgresql://192.168.0.116:52345/postgres", "allReplace" : true } ], "table" : "siemens2_ETO_PDM_Header", "queryTimeOut" : 0, "fetchSize" : 0, "useMaxFunc" : false, "uniqueKey" : [ "id" ], "column" : [ { "name" : "id", "type" : "DECIMAL(10, 0) NOT NULL", "index" : 0, "notNull" : false, "part" : false }, { "name" : "parent_id", "type" : "DECIMAL(10, 0) NOT NULL", "index" : 1, "notNull" : false, "part" : false }, { "name" : "project_no", "type" : "VARCHAR(50) NOT NULL", "index" : 2, "notNull" : false, "part" : false }, { "name" : "sales_order", "type" : "VARCHAR(50) NOT NULL", "index" : 3, "notNull" : false, "part" : false }, { "name" : "plant_code", "type" : "VARCHAR(50)", "index" : 4, "notNull" : false, "part" : false }, { "name" : "product_line", "type" : "VARCHAR(50)", "index" : 5, "notNull" : false, "part" : false }, { "name" : "sales_order_item", "type" : "VARCHAR(50) NOT NULL", "index" : 6, "notNull" : false, "part" : false }, { "name" : "bay_id", "type" : "VARCHAR(50)", "index" : 7, "notNull" : false, "part" : false }, { "name" : "order_no", "type" : "VARCHAR(50) NOT NULL", "index" : 8, "notNull" : false, "part" : false }, { "name" : "order_status", "type" : "INT", "index" : 9, "notNull" : false, "part" : false }, { "name" : "order_level", "type" : "INT NOT NULL", "index" : 10, "notNull" : false, "part" : false }, { "name" : "sap_order_status", "type" : "VARCHAR(50)", "index" : 11, "notNull" : false, "part" : false }, { "name" : "material_no", "type" : "VARCHAR(50) NOT NULL", "index" : 12, "notNull" : false, "part" : false }, { "name" : "material_descr", "type" : "VARCHAR(255)", "index" : 13, "notNull" : false, "part" : false }, { "name" : "final_material", "type" : "VARCHAR(50)", "index" : 14, "notNull" : false, "part" : false }, { "name" : "init_production_center", "type" : "VARCHAR(50)", "index" : 15, "notNull" : false, "part" : false }, { "name" : "final_production_center", "type" : "VARCHAR(50)", "index" : 16, "notNull" : false, "part" : false }, { "name" : "module_code", "type" : "VARCHAR(50)", "index" : 17, "notNull" : false, "part" : false }, { "name" : "material_no_fellow", "type" : "VARCHAR(50)", "index" : 18, "notNull" : false, "part" : false }, { "name" : "order_no_fellow", "type" : "VARCHAR(50)", "index" : 19, "notNull" : false, "part" : false }, { "name" : "item_component_list", "type" : "VARCHAR(50)", "index" : 20, "notNull" : false, "part" : false }, { "name" : "reservation", "type" : "VARCHAR(50)", "index" : 21, "notNull" : false, "part" : false }, { "name" : "reservation_item", "type" : "VARCHAR(50)", "index" : 22, "notNull" : false, "part" : false }, { "name" : "material_picking1_times", "type" : "INT", "index" : 23, "notNull" : false, "part" : false }, { "name" : "material_picking2_times", "type" : "INT", "index" : 24, "notNull" : false, "part" : false }, { "name" : "first_date", "type" : "TIMESTAMP(6)", "index" : 25, "notNull" : false, "part" : false }, { "name" : "basic_start_time", "type" : "TIMESTAMP(6)", "index" : 26, "notNull" : false, "part" : false }, { "name" : "basic_finish_time", "type" : "TIMESTAMP(6)", "index" : 27, "notNull" : false, "part" : false }, { "name" : "product_start_time", "type" : "TIMESTAMP(6)", "index" : 28, "notNull" : false, "part" : false }, { "name" : "mes_plan_start_time", "type" : "TIMESTAMP(6)", "index" : 29, "notNull" : false, "part" : false }, { "name" : "mes_plan_end_time", "type" : "TIMESTAMP(6)", "index" : 30, "notNull" : false, "part" : false }, { "name" : "mes_task_exec_time", "type" : "TIMESTAMP(6)", "index" : 31, "notNull" : false, "part" : false }, { "name" : "mes_plan_u_time", "type" : "TIMESTAMP(6)", "index" : 32, "notNull" : false, "part" : false }, { "name" : "is_deleted", "type" : "INT", "index" : 33, "notNull" : false, "part" : false }, { "name" : "is_modify", "type" : "INT", "index" : 34, "notNull" : false, "part" : false }, { "name" : "create_user", "type" : "VARCHAR(50)", "index" : 35, "notNull" : false, "part" : false }, { "name" : "update_user", "type" : "VARCHAR(50)", "index" : 36, "notNull" : false, "part" : false }, { "name" : "create_time", "type" : "TIMESTAMP(6)", "index" : 37, "notNull" : false, "part" : false }, { "name" : "update_time", "type" : "TIMESTAMP(6)", "index" : 38, "notNull" : false, "part" : false } ], "errorPercentage" : -1, "fieldNameList" : [ ], "withNoLock" : false, "increColumnIndex" : -1, "allReplace" : true, "initReporter" : true, "jdbcUrl" : "jdbc:postgresql://192.168.0.116:52345/postgres", "connectTimeOut" : 0, "batchSize" : 1024, "speedBytes" : 0, "rowSizeCalculatorType" : "objectSizeCalculator", "metricPluginName" : "prometheus", "username" : "postgres" } 2022-09-08 19:27:04,518 INFO com.dtstack.chunjun.source.DtInputFormatSourceFunction [] - Start initialize input format state, is restored:false 2022-09-08 19:27:04,520 INFO com.dtstack.chunjun.source.DtInputFormatSourceFunction [] - End initialize input format state 2022-09-08 19:27:04,531 INFO com.dtstack.chunjun.connector.sqlservercdc.inputFormat.SqlServerCdcInputFormat [] - sqlServer cdc openInternal split number:0 start... 2022-09-08 19:27:04,677 INFO com.dtstack.chunjun.connector.sqlservercdc.inputFormat.SqlServerCdcInputFormat [] - SqlserverCdcInputFormat[sqlserverCDCToPostgresql2]open: end 2022-09-08 19:27:04,677 INFO com.dtstack.chunjun.connector.sqlservercdc.listener.SqlServerCdcListener [] - SqlServerCdcListener start running..... 2022-09-08 19:27:04,685 INFO com.dtstack.chunjun.connector.sqlservercdc.inputFormat.SqlServerCdcInputFormat [] - [SqlServerCdcInputFormat] open successfully, inputSplit = GenericSplit (0/1), [SqlServerCdcConf]: { "semantic" : "at-least-once", "databaseName" : "CDCTest", "errorRecord" : 0, "checkFormat" : true, "parallelism" : 1, "executeDdlAble" : false, "errorPercentage" : -1, "flushIntervalMills" : 10000, "fieldNameList" : [ ], "url" : "jdbc:sqlserver://192.168.8.212:1433;databaseName=CDCTest", "pavingData" : true, "password" : "**", "pollInterval" : 1000, "splitUpdate" : false, "cat" : "insert,delete,update", "tableList" : [ "dbo.ETO_PDM_Header" ], "timestampFormat" : "sql", "autoResetConnection" : false, "batchSize" : 1, "autoCommit" : false, "speedBytes" : 0, "rowSizeCalculatorType" : "objectSizeCalculator", "metricPluginName" : "prometheus", "username" : "sa" }
Search before asking
What happened
任务正常提交给flink,任务执行正常,没有任何异常信息,但是数据未同步到postgresql中。 任务配置信息如下: CREATE TABLE source ( id bigint NOT NULL, parent_id bigint NOT NULL, project_no varchar(50) NOT NULL, sales_order varchar(50) NOT NULL, plant_code varchar(50) NULL, product_line varchar(50) NULL, sales_order_item varchar(50) NOT NULL, bay_id varchar(50) NULL, order_no varchar(50) NOT NULL, order_status int NULL, order_level int NOT NULL, sap_order_status varchar(50) NULL, material_no varchar(50) NOT NULL, material_descr varchar(255) NULL, final_material varchar(50) NULL, init_production_center varchar(50) NULL, final_production_center varchar(50) NULL, module_code varchar(50) NULL, material_no_fellow varchar(50) NULL, order_no_fellow varchar(50) NULL, item_component_list varchar(50) NULL, reservation varchar(50) NULL, reservation_item varchar(50) NULL, material_picking1_times int NULL, material_picking2_times int NULL, first_date timestamp NULL, basic_start_time timestamp NULL, basic_finish_time timestamp NULL, product_start_time timestamp NULL, mes_plan_start_time timestamp NULL, mes_plan_end_time timestamp NULL, mes_task_exec_time timestamp NULL, mes_plan_u_time timestamp NULL, is_deleted int NULL, is_modify int NULL, create_user varchar(50) NULL, update_user varchar(50) NULL, create_time timestamp NULL, update_time timestamp NOT NULL, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'sqlservercdc-x' ,'username' = 'sa' ,'password' = 'sa@123' ,'cat' = 'insert,delete,update' ,'url' = 'jdbc:sqlserver://192.168.8.212:1433;databaseName=CDCTest' ,'table' = 'dbo.ETO_PDM_Header' ,'timestamp-format.standard' = 'SQL' ,'database' = 'CDCTest' ,'poll-interval' = '1000' );
CREATE TABLE sink ( id numeric NOT NULL, parent_id numeric NOT NULL, project_no varchar(50) NOT NULL, sales_order varchar(50) NOT NULL, plant_code varchar(50) NULL, product_line varchar(50) NULL, sales_order_item varchar(50) NOT NULL, bay_id varchar(50) NULL, order_no varchar(50) NOT NULL, order_status int NULL, order_level int NOT NULL, sap_order_status varchar(50) NULL, material_no varchar(50) NOT NULL, material_descr varchar(255) NULL, final_material varchar(50) NULL, init_production_center varchar(50) NULL, final_production_center varchar(50) NULL, module_code varchar(50) NULL, material_no_fellow varchar(50) NULL, order_no_fellow varchar(50) NULL, item_component_list varchar(50) NULL, reservation varchar(50) NULL, reservation_item varchar(50) NULL, material_picking1_times int NULL, material_picking2_times int NULL, first_date timestamp NULL, basic_start_time timestamp NULL, basic_finish_time timestamp NULL, product_start_time timestamp NULL, mes_plan_start_time timestamp NULL, mes_plan_end_time timestamp NULL, mes_task_exec_time timestamp NULL, mes_plan_u_time timestamp NULL, is_deleted int NULL, is_modify int NULL, create_user varchar(50) NULL, update_user varchar(50) NULL, create_time timestamp NULL, update_time timestamp NULL, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'postgresql-x', 'url' = 'jdbc:postgresql://192.168.0.116:52345/postgres', 'table-name' = 'siemens2_ETO_PDM_Header', 'username' = 'postgres', 'password' = 'P@stgres123', 'sink.buffer-flush.max-rows' = '1024', 'sink.buffer-flush.interval' = '1000', 'sink.all-replace' = 'true', 'sink.parallelism' = '1' );
insert into sink select * from source u;
任务提交脚本: sh bin/chunjun-standalone.sh -jobName sqlserverCDCToPostgresql -job sqlserver_cdc_pg.sql
提交成功之后,flink运行正常,数据就是没有同步 sqlserver数据库cdc的配置也是按照chunjun文档去配置的
What you expected to happen
数据未同步
How to reproduce
按照问题描述去操作
Anything else
No response
Version
master
Are you willing to submit PR?
Code of Conduct