apache / flink-cdc

Flink CDC is a streaming data integration tool
https://nightlies.apache.org/flink/flink-cdc-docs-stable
Apache License 2.0
5.7k stars 1.94k forks source link

Failed to discover captured tables for enumerator when use apache shardingsphere as cdc source. #1148

Closed nickfan closed 8 months ago

nickfan commented 2 years ago

Describe the bug(Please use English) A clear and concise description of what the bug is.

Environment :

To Reproduce Steps to reproduce the behavior: when use apache ShardingSphere as cdc datasource,it will cause a error when cdc try to enum the database tables:

  1. Thes test data :
  2. The test code :
  3. The error :
    2022-04-27T07:07:42.382588191Z 2022-04-27 07:07:42,382 INFO  com.ververica.cdc.connectors.mysql.MySqlValidator            [] - MySQL validation passed.
    jobmanager_1      | 2022-04-27T07:07:42.382919806Z 2022-04-27 07:07:42,382 INFO  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] - Read list of available databases
    jobmanager_1      | 2022-04-27T07:07:42.383941090Z 2022-04-27 07:07:42,383 INFO  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -       list of available databases is: [sharding_db]
    jobmanager_1      | 2022-04-27T07:07:42.383960456Z 2022-04-27 07:07:42,383 INFO  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] - Read list of available tables in each database
    jobmanager_1      | 2022-04-27T07:07:42.385029769Z 2022-04-27 07:07:42,384 WARN  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -       skipping database 'sharding_db' due to error reading tables: No database selected
    jobmanager_1      | 2022-04-27T07:07:42.385757625Z 2022-04-27 07:07:42,385 INFO  io.debezium.jdbc.JdbcConnection                              [] - Connection gracefully closed
    jobmanager_1      | 2022-04-27T07:07:42.387685947Z 2022-04-27 07:07:42,385 ERROR org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Failed to create Source Enumerator for source Source: TableSourceScan(table=[[myhive, default, cdc_init_spl_queue_log]], fields=[id, org_id, spl_order_id, tracking_code]) -> DropUpdateBefore
    jobmanager_1      | 2022-04-27T07:07:42.387703215Z org.apache.flink.util.FlinkRuntimeException: Failed to discover captured tables for enumerator
    jobmanager_1      | 2022-04-27T07:07:42.387707550Z      at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:170) ~[flink-sql-connector-mysql-cdc-2.1.0.jar:2.1.0]

Additional Description If applicable, add screenshots to help explain your problem.

image-20220427152639197

ShardingSphere is missing the information_schema database which provider the metadata information of the instance databases,may be that's the reason?

nickfan commented 2 years ago

i upgraded shardingsphere to 5.1.1, still cause the same issue (with information_schema included):

jobmanager_1      | 2022-05-06T01:54:19.211855085Z 2022-05-06 01:54:19,211 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting execution of job insert-into_myhive.default.es_spl_queue_log (3b8fad9f3ed3a1d1d6ba4cb09583f2f9) under job master id 00000000000000000000000000000000.
jobmanager_1      | 2022-05-06T01:54:19.213168090Z 2022-05-06 01:54:19,213 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Starting split enumerator for source Source: TableSourceScan(table=[[myhive, default, cdc_init_spl_queue_log]], fields=[id, org_id, spl_order_id, tracking_code]) -> DropUpdateBefore.
jobmanager_1      | 2022-05-06T01:54:19.219152022Z 2022-05-06 01:54:19,219 INFO  com.ververica.cdc.connectors.mysql.source.connection.JdbcConnectionPools [] - Create and register connection pool 172.16.0.33:3308
jobmanager_1      | 2022-05-06T01:54:19.220544261Z 2022-05-06 01:54:19,220 INFO  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.HikariDataSource [] - connection-pool-172.16.0.33:3308 - Starting...
jobmanager_1      | 2022-05-06T01:54:19.306373014Z 2022-05-06 01:54:19,306 INFO  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.HikariDataSource [] - connection-pool-172.16.0.33:3308 - Start completed.
jobmanager_1      | 2022-05-06T01:54:19.341044487Z 2022-05-06 01:54:19,340 INFO  io.debezium.jdbc.JdbcConnection                              [] - Connection gracefully closed
jobmanager_1      | 2022-05-06T01:54:19.341182589Z 2022-05-06 01:54:19,341 INFO  com.ververica.cdc.connectors.mysql.MySqlValidator            [] - MySQL validation passed.
jobmanager_1      | 2022-05-06T01:54:19.348825206Z 2022-05-06 01:54:19,348 INFO  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] - Read list of available databases
jobmanager_1      | 2022-05-06T01:54:19.351583669Z 2022-05-06 01:54:19,351 INFO  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -        list of available databases is: [sharding_db, mysql, information_schema, performance_schema, sys]
jobmanager_1      | 2022-05-06T01:54:19.351597610Z 2022-05-06 01:54:19,351 INFO  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] - Read list of available tables in each database
jobmanager_1      | 2022-05-06T01:54:19.369729366Z 2022-05-06 01:54:19,369 WARN  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -        skipping database 'sharding_db' due to error reading tables: No database selected
jobmanager_1      | 2022-05-06T01:54:19.371206896Z 2022-05-06 01:54:19,371 WARN  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -        skipping database 'mysql' due to error reading tables: No database selected
jobmanager_1      | 2022-05-06T01:54:19.372318346Z 2022-05-06 01:54:19,372 WARN  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -        skipping database 'information_schema' due to error reading tables: No database selected
jobmanager_1      | 2022-05-06T01:54:19.373209226Z 2022-05-06 01:54:19,373 WARN  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -        skipping database 'performance_schema' due to error reading tables: No database selected
jobmanager_1      | 2022-05-06T01:54:19.374866886Z 2022-05-06 01:54:19,374 WARN  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -        skipping database 'sys' due to error reading tables: No database selected
jobmanager_1      | 2022-05-06T01:54:19.376215140Z 2022-05-06 01:54:19,376 INFO  io.debezium.jdbc.JdbcConnection                              [] - Connection gracefully closed
jobmanager_1      | 2022-05-06T01:54:19.381521176Z 2022-05-06 01:54:19,378 ERROR org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Failed to create Source Enumerator for source Source: TableSourceScan(table=[[myhive, default, cdc_init_spl_queue_log]], fields=[id, org_id, spl_order_id, tracking_code]) -> DropUpdateBefore
jobmanager_1      | 2022-05-06T01:54:19.381544918Z org.apache.flink.util.FlinkRuntimeException: Failed to discover captured tables for enumerator
jobmanager_1      | 2022-05-06T01:54:19.381548337Z      at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:170) ~[flink-sql-connector-mysql-cdc-2.1.0.jar:2.1.0]
jobmanager_1      | 2022-05-06T01:54:19.381551337Z      at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:128) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381554248Z      at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:291) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381557585Z      at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:70) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381560391Z      at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381563002Z      at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:85) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381565617Z      at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:592) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381568183Z      at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:955) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381570771Z      at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:873) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381573286Z      at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:383) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381575813Z      at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381579351Z      at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:605) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381582075Z      at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:180) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381584698Z      at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381587246Z      at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381589746Z      at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381592192Z      at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381604319Z      at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381607039Z      at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381609600Z      at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381612323Z      at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381614900Z      at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381617444Z      at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381619887Z      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381622342Z      at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381624803Z      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381627320Z      at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381629931Z      at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381632520Z      at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381635121Z      at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381637742Z      at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381640473Z      at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T01:54:19.381643027Z Caused by: java.lang.IllegalArgumentException: Can't find any matched tables, please check your configured database-name: [sharding_db] and table-name: [sharding_db.spl_queue_log]
jobmanager_1      | 2022-05-06T01:54:19.381645744Z      at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables(DebeziumUtils.java:167) ~[flink-sql-connector-mysql-cdc-2.1.0.jar:2.1.0]
jobmanager_1      | 2022-05-06T01:54:19.381648582Z      at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:161) ~[flink-sql-connector-mysql-cdc-2.1.0.jar:2.1.0]
jobmanager_1      | 2022-05-06T01:54:19.381651603Z      ... 31 more

image

nickfan commented 2 years ago

I've put together the relevant configuration files for reproducing these issues.

apache shardingsphere 5.1.1

server.yaml


mode:
  type: Cluster
  repository:
    type: ZooKeeper
    props:
      namespace: governance_ds
      server-lists: zookeeper:2181
      retryIntervalMilliseconds: 500
      timeToLiveSeconds: 60
      maxRetries: 3
      operationTimeoutMilliseconds: 500
  overwrite: false

rules:
  - !AUTHORITY
    users:
      - root@%:root
      - sharding@:sharding
    provider:
      type: ALL_PRIVILEGES_PERMITTED
#  - !TRANSACTION
#    defaultType: XA
#    providerType: Atomikos

props:
  max-connections-size-per-query: 1
  kernel-executor-size: 16  # Infinite by default.
  proxy-frontend-flush-threshold: 128  # The default value is 128.
  proxy-opentracing-enabled: false
#  proxy-hint-enabled: false
  sql-show: true
#  check-table-metadata-enabled: false
#  show-process-list-enabled: false
#    # Proxy backend query fetch size. A larger value may increase the memory usage of ShardingSphere Proxy.
#    # The default value is -1, which means set the minimum value for different JDBC drivers.
#  proxy-backend-query-fetch-size: -1
#  check-duplicate-table-enabled: false
#  proxy-frontend-executor-size: 0 # Proxy frontend executor size. The default value is 0, which means let Netty decide.
#    # Available options of proxy backend executor suitable: OLAP(default), OLTP. The OLTP option may reduce time cost of writing packets to client, but it may increase the latency of SQL execution
#    # and block other clients if client connections are more than `proxy-frontend-executor-size`, especially executing slow SQL.
#  proxy-backend-executor-suitable: OLAP
#  proxy-frontend-max-connections: 0 # Less than or equal to 0 means no limitation.
#  sql-federation-enabled: false
#    # Available proxy backend driver type: JDBC (default), ExperimentalVertx
#  proxy-backend-driver-type: JDBC

config-sharding.yaml

schemaName: sharding_db
dataSources:
  ds_0:
    url: jdbc:mysql://mysql:3306/demo_ds_0?serverTimezone=UTC&useSSL=false
    username: root
    password: root
    connectionTimeoutMilliseconds: 30000
    idleTimeoutMilliseconds: 60000
    maxLifetimeMilliseconds: 1800000
    maxPoolSize: 50
    minPoolSize: 1
  ds_1:
    url: jdbc:mysql://mysql:3306/demo_ds_1?serverTimezone=UTC&useSSL=false
    username: root
    password: root
    connectionTimeoutMilliseconds: 30000
    idleTimeoutMilliseconds: 60000
    maxLifetimeMilliseconds: 1800000
    maxPoolSize: 50
    minPoolSize: 1
rules:
- !SHARDING
  tables:
    t_order:
      actualDataNodes: ds_${0..1}.t_order_${0..1}
      tableStrategy:
        standard:
          shardingColumn: org_id
          shardingAlgorithmName: t_order_inline
      keyGenerateStrategy:
        column: id
        keyGeneratorName: snowflake
    t_order_item:
      actualDataNodes: ds_${0..1}.t_order_item_${0..1}
      tableStrategy:
        standard:
          shardingColumn: org_id
          shardingAlgorithmName: t_order_item_inline
      keyGenerateStrategy:
        column: id
        keyGeneratorName: snowflake
    t_org_option:
      actualDataNodes: ds_${0..1}.t_org_option
      tableStrategy:
        none:
      keyGenerateStrategy:
        column: id
        keyGeneratorName: snowflake
    t_global_option:
      actualDataNodes: ds_${0..1}.t_global_option
      tableStrategy:
        none:
      keyGenerateStrategy:
        column: id
        keyGeneratorName: snowflake
  bindingTables:
    - t_order
    - t_order_item
    - t_org_option
  broadcastTables:
    - t_global_option
  defaultDatabaseStrategy:
    standard:
      shardingColumn: org_id
      shardingAlgorithmName: database_inline
  defaultTableStrategy:
    none:
  defaultKeyGenerateStrategy:
    column: id
    keyGeneratorName: snowflake
  defaultShardingColumn: org_id
  shardingAlgorithms:
    database_inline:
      type: INLINE
      props:
        algorithm-expression: ds_${org_id % 2}
    t_order_inline:
      type: INLINE
      props:
        algorithm-expression: t_order_${org_id % 2}
    t_order_item_inline:
      type: INLINE
      props:
        algorithm-expression: t_order_item_${org_id % 2}
  keyGenerators:
    snowflake:
      type: SNOWFLAKE
      props:
        worker-id: 123

mysql source schema init sql:

CREATE DATABASE IF NOT EXISTS `demo_ds_0` DEFAULT CHARACTER SET = `utf8` DEFAULT COLLATE = `utf8_general_ci`;
CREATE DATABASE IF NOT EXISTS `demo_ds_1` DEFAULT CHARACTER SET = `utf8` DEFAULT COLLATE = `utf8_general_ci`;

USE `demo_ds_0`;

CREATE TABLE IF NOT EXISTS `t_global_option` (
  `id` bigint(20) unsigned NOT NULL COMMENT 'ID',
  `option_name` varchar(255) NOT NULL COMMENT '选项名',
  `option_value` varchar(255) DEFAULT NULL COMMENT '选项值',
  `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `option_name` (`option_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE IF NOT EXISTS `t_org_option` (
  `id` bigint(20) unsigned NOT NULL COMMENT 'ID',
  `org_id` int(11) NOT NULL COMMENT '组织id',
  `option_name` varchar(255) NOT NULL COMMENT '选项名',
  `option_value` varchar(255) DEFAULT NULL COMMENT '选项值',
  `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `option_name` (`option_name`,`org_id`),
  KEY `org_id` (`org_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE IF NOT EXISTS `t_order_0` (
  `id` bigint(20) unsigned NOT NULL COMMENT 'ID',
  `org_id` int(11) NOT NULL DEFAULT '0' COMMENT '组织id',
  `order_source_platform` varchar(255) DEFAULT NULL COMMENT '订单来源平台',
  `order_source_no` varchar(255) DEFAULT NULL COMMENT '订单来源单号',
  `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  KEY `org_id` (`org_id`),
  KEY `created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE IF NOT EXISTS `t_order_1` (
  `id` bigint(20) unsigned NOT NULL COMMENT 'ID',
  `org_id` int(11) NOT NULL DEFAULT '0' COMMENT '组织id',
  `order_source_platform` varchar(255) DEFAULT NULL COMMENT '订单来源平台',
  `order_source_no` varchar(255) DEFAULT NULL COMMENT '订单来源单号',
  `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  KEY `org_id` (`org_id`),
  KEY `created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE IF NOT EXISTS `t_order_item_0` (
  `id` bigint(20) unsigned NOT NULL COMMENT 'ID',
  `org_id` int(11) NOT NULL DEFAULT '0' COMMENT '组织id',
  `order_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '所属订单id',
  `order_source_no` varchar(255) DEFAULT NULL COMMENT '订单来源单号',
  `sku` varchar(255) NOT NULL COMMENT '平台SKU',
  `unit` varchar(255) NOT NULL COMMENT '单位',
  `qty` decimal(10,4) NOT NULL DEFAULT '0.0000' COMMENT '数量',
  `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `sku` (`sku`,`order_id`,`org_id`),
  KEY `org_id` (`org_id`),
  KEY `order_id` (`order_id`,`org_id`),
  KEY `created_at` (`created_at`,`org_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE IF NOT EXISTS `t_order_item_1` (
  `id` bigint(20) unsigned NOT NULL COMMENT 'ID',
  `org_id` int(11) NOT NULL DEFAULT '0' COMMENT '组织id',
  `order_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '所属订单id',
  `order_source_no` varchar(255) DEFAULT NULL COMMENT '订单来源单号',
  `sku` varchar(255) NOT NULL COMMENT '平台SKU',
  `unit` varchar(255) NOT NULL COMMENT '单位',
  `qty` decimal(10,4) NOT NULL DEFAULT '0.0000' COMMENT '数量',
  `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `sku` (`sku`,`order_id`,`org_id`),
  KEY `org_id` (`org_id`),
  KEY `order_id` (`order_id`,`org_id`),
  KEY `created_at` (`created_at`,`org_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

USE `demo_ds_1`;

CREATE TABLE IF NOT EXISTS `t_global_option` (
  `id` bigint(20) unsigned NOT NULL COMMENT 'ID',
  `option_name` varchar(255) NOT NULL COMMENT '选项名',
  `option_value` varchar(255) DEFAULT NULL COMMENT '选项值',
  `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `option_name` (`option_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE IF NOT EXISTS `t_org_option` (
  `id` bigint(20) unsigned NOT NULL COMMENT 'ID',
  `org_id` int(11) NOT NULL COMMENT '组织id',
  `option_name` varchar(255) NOT NULL COMMENT '选项名',
  `option_value` varchar(255) DEFAULT NULL COMMENT '选项值',
  `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `option_name` (`option_name`,`org_id`),
  KEY `org_id` (`org_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE IF NOT EXISTS `t_order_0` (
  `id` bigint(20) unsigned NOT NULL COMMENT 'ID',
  `org_id` int(11) NOT NULL DEFAULT '0' COMMENT '组织id',
  `order_source_platform` varchar(255) DEFAULT NULL COMMENT '订单来源平台',
  `order_source_no` varchar(255) DEFAULT NULL COMMENT '订单来源单号',
  `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  KEY `org_id` (`org_id`),
  KEY `created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE IF NOT EXISTS `t_order_1` (
  `id` bigint(20) unsigned NOT NULL COMMENT 'ID',
  `org_id` int(11) NOT NULL DEFAULT '0' COMMENT '组织id',
  `order_source_platform` varchar(255) DEFAULT NULL COMMENT '订单来源平台',
  `order_source_no` varchar(255) DEFAULT NULL COMMENT '订单来源单号',
  `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  KEY `org_id` (`org_id`),
  KEY `created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE IF NOT EXISTS `t_order_item_0` (
  `id` bigint(20) unsigned NOT NULL COMMENT 'ID',
  `org_id` int(11) NOT NULL DEFAULT '0' COMMENT '组织id',
  `order_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '所属订单id',
  `order_source_no` varchar(255) DEFAULT NULL COMMENT '订单来源单号',
  `sku` varchar(255) NOT NULL COMMENT '平台SKU',
  `unit` varchar(255) NOT NULL COMMENT '单位',
  `qty` decimal(10,4) NOT NULL DEFAULT '0.0000' COMMENT '数量',
  `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `sku` (`sku`,`order_id`,`org_id`),
  KEY `org_id` (`org_id`),
  KEY `order_id` (`order_id`,`org_id`),
  KEY `created_at` (`created_at`,`org_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE IF NOT EXISTS `t_order_item_1` (
  `id` bigint(20) unsigned NOT NULL COMMENT 'ID',
  `org_id` int(11) NOT NULL DEFAULT '0' COMMENT '组织id',
  `order_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '所属订单id',
  `order_source_no` varchar(255) DEFAULT NULL COMMENT '订单来源单号',
  `sku` varchar(255) NOT NULL COMMENT '平台SKU',
  `unit` varchar(255) NOT NULL COMMENT '单位',
  `qty` decimal(10,4) NOT NULL DEFAULT '0.0000' COMMENT '数量',
  `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `sku` (`sku`,`order_id`,`org_id`),
  KEY `org_id` (`org_id`),
  KEY `order_id` (`order_id`,`org_id`),
  KEY `created_at` (`created_at`,`org_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

data mockup sql for shardingsphere 'sharding_db':


USE `sharding_db`;

-- t_global_option table
INSERT INTO `t_global_option` (`option_name`, `option_value`) VALUES ('k1', 'v1');
INSERT INTO `t_global_option` (`option_name`, `option_value`) VALUES ('k2', 'v2');
INSERT INTO `t_global_option` (`option_name`, `option_value`) VALUES ('k3', 'v3');

-- t_org_option table
INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (1,'k1', 'v1');
INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (1,'k2', 'v2');
INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (1,'k3', 'v3');

INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (2,'k1', 'v1');
INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (2,'k2', 'v2');
INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (2,'k3', 'v3');

INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (3,'k1', 'v1');
INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (3,'k2', 'v2');
INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (3,'k3', 'v3');

INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (4,'k1', 'v1');
INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (4,'k2', 'v2');
INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (4,'k3', 'v3');

INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (5,'k1', 'v1');
INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (5,'k2', 'v2');
INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (5,'k3', 'v3');

INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (1,'amazon.com', 'amz-o1-so1');
INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (1,'amazon.com', 'amz-o1-so1');
INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (1,'amazon.com', 'amz-o1-so1');

INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (2,'amazon.com', 'amz-o2-so1');
INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (2,'amazon.com', 'amz-o2-so2');
INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (2,'amazon.com', 'amz-o2-so3');

INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (3,'amazon.com', 'amz-o3-so1');
INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (3,'amazon.com', 'amz-o3-so2');
INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (3,'amazon.com', 'amz-o3-so3');

INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (4,'amazon.com', 'amz-o4-so1');
INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (4,'amazon.com', 'amz-o4-so2');
INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (4,'amazon.com', 'amz-o4-so3');

INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (5,'amazon.com', 'amz-o5-so1');
INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (5,'amazon.com', 'amz-o5-so2');
INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (5,'amazon.com', 'amz-o5-so3');

flink version 1.13.3 cdc connector version 2.1.0

flink-sql mysql cdc source and es sink :


CREATE TABLE IF NOT EXISTS cdc_init_ds_t_order (
id BIGINT,
org_id INTEGER,
order_source_platform VARCHAR(255),
order_source_no VARCHAR(255),
created_at TIMESTAMP(0),
updated_at TIMESTAMP(0),
PRIMARY KEY (id) NOT ENFORCED 
) WITH (
'connector' = 'mysql-cdc',
'scan.startup.mode' = 'initial',
'hostname' = '172.16.0.33',
'port' = '3308',
'username' = 'root',
'password' = 'root',
'database-name' = 'sharding_db',
'table-name' = 't_order'
);

CREATE TABLE IF NOT EXISTS es_ds_t_order (
id BIGINT,
org_id INTEGER,
order_source_platform VARCHAR(255),
order_source_no VARCHAR(255),
created_at TIMESTAMP(0),
updated_at TIMESTAMP(0),
PRIMARY KEY (id) NOT ENFORCED 
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://elasticsearch:9200',
'index' = 't_order'
);

flink-sql cdc task mysql cdc source sink 2 es:

INSERT INTO es_ds_t_order SELECT * FROM cdc_init_ds_t_order;

flink console log:

jobmanager_1      | 2022-05-06T03:41:18.091783496Z 2022-05-06 03:41:18,091 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Received JobGraph submission 6f4d2fb809cd27113aa5a210af958eca (insert-into_myhive.default.es_ds_t_order).
jobmanager_1      | 2022-05-06T03:41:18.092316247Z 2022-05-06 03:41:18,092 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Submitting job 6f4d2fb809cd27113aa5a210af958eca (insert-into_myhive.default.es_ds_t_order).
jobmanager_1      | 2022-05-06T03:41:18.115033279Z 2022-05-06 03:41:18,114 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_17 .
jobmanager_1      | 2022-05-06T03:41:18.115982107Z 2022-05-06 03:41:18,115 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Initializing job insert-into_myhive.default.es_ds_t_order (6f4d2fb809cd27113aa5a210af958eca).
jobmanager_1      | 2022-05-06T03:41:18.117063069Z 2022-05-06 03:41:18,117 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using restart back off time strategy FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=180000,backoffTimeMS=180000,maxFailuresPerInterval=4) for insert-into_myhive.default.es_ds_t_order (6f4d2fb809cd27113aa5a210af958eca).
jobmanager_1      | 2022-05-06T03:41:18.117757348Z 2022-05-06 03:41:18,117 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Running initialization on master for job insert-into_myhive.default.es_ds_t_order (6f4d2fb809cd27113aa5a210af958eca).
jobmanager_1      | 2022-05-06T03:41:18.117771898Z 2022-05-06 03:41:18,117 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Successfully ran initialization on master in 0 ms.
jobmanager_1      | 2022-05-06T03:41:18.146655076Z 2022-05-06 03:41:18,146 INFO  org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 pipelined regions in 0 ms
jobmanager_1      | 2022-05-06T03:41:18.147124380Z 2022-05-06 03:41:18,147 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using job/cluster config to configure application-defined state backend: org.apache.flink.runtime.state.hashmap.HashMapStateBackend@749f20e
jobmanager_1      | 2022-05-06T03:41:18.147162910Z 2022-05-06 03:41:18,147 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using application-defined state backend: org.apache.flink.runtime.state.hashmap.HashMapStateBackend@b868bf4
jobmanager_1      | 2022-05-06T03:41:18.147354271Z 2022-05-06 03:41:18,147 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Checkpoint storage is set to 'filesystem': (checkpoints "hdfs://namenode:8020/flink/checkpoints")
jobmanager_1      | 2022-05-06T03:41:18.188205038Z 2022-05-06 03:41:18,188 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No checkpoint found during restore.
jobmanager_1      | 2022-05-06T03:41:18.188312206Z 2022-05-06 03:41:18,188 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@7077d08 for insert-into_myhive.default.es_ds_t_order (6f4d2fb809cd27113aa5a210af958eca).
jobmanager_1      | 2022-05-06T03:41:18.188662958Z 2022-05-06 03:41:18,188 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting execution of job insert-into_myhive.default.es_ds_t_order (6f4d2fb809cd27113aa5a210af958eca) under job master id 00000000000000000000000000000000.
jobmanager_1      | 2022-05-06T03:41:18.189276971Z 2022-05-06 03:41:18,189 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Starting split enumerator for source Source: TableSourceScan(table=[[myhive, default, cdc_init_ds_t_order]], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) -> DropUpdateBefore.
jobmanager_1      | 2022-05-06T03:41:18.193002757Z 2022-05-06 03:41:18,192 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@7590c18e (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
jobmanager_1      | 2022-05-06T03:41:18.197250937Z 2022-05-06 03:41:18,196 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@58d6122f (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
jobmanager_1      | 2022-05-06T03:41:18.198771011Z 2022-05-06 03:41:18,198 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@301caba0 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
jobmanager_1      | 2022-05-06T03:41:18.200223069Z 2022-05-06 03:41:18,200 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@4373809c (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
jobmanager_1      | 2022-05-06T03:41:18.201675640Z 2022-05-06 03:41:18,201 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@4413048f (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
jobmanager_1      | 2022-05-06T03:41:18.203528540Z 2022-05-06 03:41:18,203 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@2cbc3148 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
jobmanager_1      | 2022-05-06T03:41:18.204849069Z 2022-05-06 03:41:18,204 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@7928ded4 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
jobmanager_1      | 2022-05-06T03:41:18.206113172Z 2022-05-06 03:41:18,206 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@2600d60d (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
jobmanager_1      | 2022-05-06T03:41:18.216072540Z 2022-05-06 03:41:18,215 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@7a6b893e (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
jobmanager_1      | 2022-05-06T03:41:18.217582928Z 2022-05-06 03:41:18,217 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@3790d14a (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
jobmanager_1      | 2022-05-06T03:41:18.220663460Z 2022-05-06 03:41:18,220 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@8f17712 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
jobmanager_1      | 2022-05-06T03:41:18.221933286Z 2022-05-06 03:41:18,221 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@28e168bf (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
jobmanager_1      | 2022-05-06T03:41:18.223072751Z 2022-05-06 03:41:18,222 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@e6536e4 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
jobmanager_1      | 2022-05-06T03:41:18.224661737Z 2022-05-06 03:41:18,224 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@3fcebce3 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
jobmanager_1      | 2022-05-06T03:41:18.225772747Z 2022-05-06 03:41:18,225 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@3c8415ef (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
jobmanager_1      | 2022-05-06T03:41:18.227405302Z 2022-05-06 03:41:18,227 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@3876e142 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
jobmanager_1      | 2022-05-06T03:41:18.228714051Z 2022-05-06 03:41:18,228 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@72157b0b (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
jobmanager_1      | 2022-05-06T03:41:18.231013816Z 2022-05-06 03:41:18,230 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@6e2223cb (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
jobmanager_1      | 2022-05-06T03:41:18.232069357Z 2022-05-06 03:41:18,232 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@6815a9f9 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
jobmanager_1      | 2022-05-06T03:41:18.235841071Z 2022-05-06 03:41:18,235 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@67663948 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
jobmanager_1      | 2022-05-06T03:41:18.315131903Z 2022-05-06 03:41:18,314 INFO  io.debezium.jdbc.JdbcConnection                              [] - Connection gracefully closed
jobmanager_1      | 2022-05-06T03:41:18.315290827Z 2022-05-06 03:41:18,315 INFO  com.ververica.cdc.connectors.mysql.MySqlValidator            [] - MySQL validation passed.
jobmanager_1      | 2022-05-06T03:41:18.315798138Z 2022-05-06 03:41:18,315 INFO  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] - Read list of available databases
jobmanager_1      | 2022-05-06T03:41:18.316828871Z 2022-05-06 03:41:18,316 INFO  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -        list of available databases is: [sharding_db, mysql, information_schema, performance_schema, sys]
jobmanager_1      | 2022-05-06T03:41:18.316848004Z 2022-05-06 03:41:18,316 INFO  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] - Read list of available tables in each database
jobmanager_1      | 2022-05-06T03:41:18.341224657Z 2022-05-06 03:41:18,341 WARN  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -        skipping database 'sharding_db' due to error reading tables: No database selected
jobmanager_1      | 2022-05-06T03:41:18.342691048Z 2022-05-06 03:41:18,342 WARN  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -        skipping database 'mysql' due to error reading tables: No database selected
jobmanager_1      | 2022-05-06T03:41:18.343667712Z 2022-05-06 03:41:18,343 WARN  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -        skipping database 'information_schema' due to error reading tables: No database selected
jobmanager_1      | 2022-05-06T03:41:18.344465873Z 2022-05-06 03:41:18,344 WARN  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -        skipping database 'performance_schema' due to error reading tables: No database selected
jobmanager_1      | 2022-05-06T03:41:18.345258206Z 2022-05-06 03:41:18,345 WARN  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -        skipping database 'sys' due to error reading tables: No database selected
jobmanager_1      | 2022-05-06T03:41:18.346396216Z 2022-05-06 03:41:18,346 INFO  io.debezium.jdbc.JdbcConnection                              [] - Connection gracefully closed
jobmanager_1      | 2022-05-06T03:41:18.347640778Z 2022-05-06 03:41:18,346 ERROR org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Failed to create Source Enumerator for source Source: TableSourceScan(table=[[myhive, default, cdc_init_ds_t_order]], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) -> DropUpdateBefore
jobmanager_1      | 2022-05-06T03:41:18.347663736Z org.apache.flink.util.FlinkRuntimeException: Failed to discover captured tables for enumerator
jobmanager_1      | 2022-05-06T03:41:18.347668994Z      at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:170) ~[flink-sql-connector-mysql-cdc-2.1.0.jar:2.1.0]
jobmanager_1      | 2022-05-06T03:41:18.347674499Z      at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:128) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347679133Z      at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:291) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347683960Z      at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:70) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347690047Z      at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347695171Z      at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:85) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347702122Z      at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:592) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347706608Z      at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:955) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347710381Z      at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:873) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347714329Z      at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:383) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347718472Z      at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347724528Z      at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:605) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347729888Z      at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:180) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347733598Z      at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347737241Z      at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347741029Z      at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347745238Z      at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347762754Z      at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347767108Z      at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347771037Z      at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347774681Z      at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347778529Z      at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347782516Z      at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347786140Z      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347791771Z      at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347795948Z      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347799511Z      at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347803973Z      at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347808945Z      at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347813429Z      at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347818245Z      at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347825951Z      at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.347830790Z Caused by: java.lang.IllegalArgumentException: Can't find any matched tables, please check your configured database-name: [sharding_db] and table-name: [sharding_db.t_order]
jobmanager_1      | 2022-05-06T03:41:18.347834971Z      at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables(DebeziumUtils.java:167) ~[flink-sql-connector-mysql-cdc-2.1.0.jar:2.1.0]
jobmanager_1      | 2022-05-06T03:41:18.347839264Z      at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:161) ~[flink-sql-connector-mysql-cdc-2.1.0.jar:2.1.0]
jobmanager_1      | 2022-05-06T03:41:18.347843703Z      ... 31 more
jobmanager_1      | 2022-05-06T03:41:18.347847624Z 2022-05-06 03:41:18,347 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
jobmanager_1      | 2022-05-06T03:41:18.347852911Z 2022-05-06 03:41:18,347 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job insert-into_myhive.default.es_ds_t_order (6f4d2fb809cd27113aa5a210af958eca) switched from state CREATED to RUNNING.
jobmanager_1      | 2022-05-06T03:41:18.348142922Z 2022-05-06 03:41:18,348 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: TableSourceScan(table=[[myhive, default, cdc_init_ds_t_order]], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) -> DropUpdateBefore (1/1) (1d29c53fdde1442b97a7dec0aa8650c1) switched from CREATED to SCHEDULED.
jobmanager_1      | 2022-05-06T03:41:18.348157054Z 2022-05-06 03:41:18,348 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - ChangelogNormalize(key=[id]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[myhive.default.es_ds_t_order], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) (1/1) (529eade9aab81d77b0dcce25d963b98a) switched from CREATED to SCHEDULED.
jobmanager_1      | 2022-05-06T03:41:18.348674695Z 2022-05-06 03:41:18,348 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Connecting to ResourceManager akka.tcp://flink@jobmanager:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)
jobmanager_1      | 2022-05-06T03:41:18.349373848Z 2022-05-06 03:41:18,349 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Resolved ResourceManager address, beginning registration
jobmanager_1      | 2022-05-06T03:41:18.350244971Z 2022-05-06 03:41:18,349 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Trying to recover from a global failure.
jobmanager_1      | 2022-05-06T03:41:18.350260239Z org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: TableSourceScan(table=[[myhive, default, cdc_init_ds_t_order]], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) -> DropUpdateBefore' (operator cbc357ccb763df2852fee8c4fc7d55f2).
jobmanager_1      | 2022-05-06T03:41:18.350266930Z      at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:553) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350272463Z      at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350276072Z      at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350279092Z      at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:132) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350281885Z      at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:291) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350285118Z      at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:70) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350288023Z      at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350291058Z      at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:85) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350293962Z      at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:592) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350306268Z      at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:955) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350309420Z      at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:873) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350313763Z      at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:383) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350316686Z      at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350319330Z      at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:605) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350321993Z      at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:180) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350324769Z      at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350327411Z      at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350330302Z      at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350333090Z      at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350335686Z      at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350338336Z      at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350341798Z      at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350344360Z      at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350346964Z      at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350349746Z      at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350352767Z      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350355517Z      at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350358205Z      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350361353Z      at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350363931Z      at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350366514Z      at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350371921Z      at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350374603Z      at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350377633Z      at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350380439Z Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to discover captured tables for enumerator
jobmanager_1      | 2022-05-06T03:41:18.350383227Z      at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:170) ~[flink-sql-connector-mysql-cdc-2.1.0.jar:2.1.0]
jobmanager_1      | 2022-05-06T03:41:18.350386109Z      at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:128) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350388798Z      ... 30 more
jobmanager_1      | 2022-05-06T03:41:18.350391575Z Caused by: java.lang.IllegalArgumentException: Can't find any matched tables, please check your configured database-name: [sharding_db] and table-name: [sharding_db.t_order]
jobmanager_1      | 2022-05-06T03:41:18.350395685Z      at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables(DebeziumUtils.java:167) ~[flink-sql-connector-mysql-cdc-2.1.0.jar:2.1.0]
jobmanager_1      | 2022-05-06T03:41:18.350399407Z      at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:161) ~[flink-sql-connector-mysql-cdc-2.1.0.jar:2.1.0]
jobmanager_1      | 2022-05-06T03:41:18.350403720Z      at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:128) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
jobmanager_1      | 2022-05-06T03:41:18.350407603Z      ... 30 more
jobmanager_1      | 2022-05-06T03:41:18.350410381Z 2022-05-06 03:41:18,350 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registering job manager 00000000000000000000000000000000@akka.tcp://flink@jobmanager:6123/user/rpc/jobmanager_17 for job 6f4d2fb809cd27113aa5a210af958eca.
jobmanager_1      | 2022-05-06T03:41:18.350413912Z 2022-05-06 03:41:18,350 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job insert-into_myhive.default.es_ds_t_order (6f4d2fb809cd27113aa5a210af958eca) switched from state RUNNING to RESTARTING.
jobmanager_1      | 2022-05-06T03:41:18.350712711Z 2022-05-06 03:41:18,350 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: TableSourceScan(table=[[myhive, default, cdc_init_ds_t_order]], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) -> DropUpdateBefore (1/1) (1d29c53fdde1442b97a7dec0aa8650c1) switched from SCHEDULED to CANCELING.
jobmanager_1      | 2022-05-06T03:41:18.350736699Z 2022-05-06 03:41:18,350 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: TableSourceScan(table=[[myhive, default, cdc_init_ds_t_order]], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) -> DropUpdateBefore (1/1) (1d29c53fdde1442b97a7dec0aa8650c1) switched from CANCELING to CANCELED.
jobmanager_1      | 2022-05-06T03:41:18.350820209Z 2022-05-06 03:41:18,350 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding the results produced by task execution 1d29c53fdde1442b97a7dec0aa8650c1.
jobmanager_1      | 2022-05-06T03:41:18.350837088Z 2022-05-06 03:41:18,350 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - ChangelogNormalize(key=[id]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[myhive.default.es_ds_t_order], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) (1/1) (529eade9aab81d77b0dcce25d963b98a) switched from SCHEDULED to CANCELING.
jobmanager_1      | 2022-05-06T03:41:18.350853127Z 2022-05-06 03:41:18,350 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - ChangelogNormalize(key=[id]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[myhive.default.es_ds_t_order], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) (1/1) (529eade9aab81d77b0dcce25d963b98a) switched from CANCELING to CANCELED.
jobmanager_1      | 2022-05-06T03:41:18.350888673Z 2022-05-06 03:41:18,350 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding the results produced by task execution 529eade9aab81d77b0dcce25d963b98a.
jobmanager_1      | 2022-05-06T03:41:18.351132167Z 2022-05-06 03:41:18,351 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registered job manager 00000000000000000000000000000000@akka.tcp://flink@jobmanager:6123/user/rpc/jobmanager_17 for job 6f4d2fb809cd27113aa5a210af958eca.
jobmanager_1      | 2022-05-06T03:41:18.388171957Z 2022-05-06 03:41:18,388 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000.
jobmanager_1      | 2022-05-06T03:41:18.388349125Z 2022-05-06 03:41:18,388 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job 6f4d2fb809cd27113aa5a210af958eca
jobmanager_1      | 2022-05-06T03:44:18.352081613Z 2022-05-06 03:44:18,351 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job insert-into_myhive.default.es_ds_t_order (6f4d2fb809cd27113aa5a210af958eca) switched from state RESTARTING to RUNNING.
jobmanager_1      | 2022-05-06T03:44:18.352196316Z 2022-05-06 03:44:18,352 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No checkpoint found during restore.
jobmanager_1      | 2022-05-06T03:44:18.352229069Z 2022-05-06 03:44:18,352 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Resetting the Operator Coordinators to an empty state.
jobmanager_1      | 2022-05-06T03:44:18.352240632Z 2022-05-06 03:44:18,352 INFO  org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator [] - Resetting coordinator to checkpoint.
jobmanager_1      | 2022-05-06T03:44:18.352655863Z 2022-05-06 03:44:18,352 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: TableSourceScan(table=[[myhive, default, cdc_init_ds_t_order]], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) -> DropUpdateBefore (1/1) (ee9671e9cae75dc457921722d8b71100) switched from CREATED to SCHEDULED.
jobmanager_1      | 2022-05-06T03:44:18.352668293Z 2022-05-06 03:44:18,352 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - ChangelogNormalize(key=[id]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[myhive.default.es_ds_t_order], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) (1/1) (492ecc6a9fe9a7ef92dbd5af27a4cab8) switched from CREATED to SCHEDULED.
jobmanager_1      | 2022-05-06T03:44:18.352752881Z 2022-05-06 03:44:18,352 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Closing SourceCoordinator for source Source: TableSourceScan(table=[[myhive, default, cdc_init_ds_t_order]], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) -> DropUpdateBefore.
jobmanager_1      | 2022-05-06T03:44:18.352835438Z 2022-05-06 03:44:18,352 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source coordinator for source Source: TableSourceScan(table=[[myhive, default, cdc_init_ds_t_order]], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) -> DropUpdateBefore closed.
jobmanager_1      | 2022-05-06T03:44:18.352933577Z 2022-05-06 03:44:18,352 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Starting split enumerator for source Source: TableSourceScan(table=[[myhive, default, cdc_init_ds_t_order]], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) -> DropUpdateBefore.
jobmanager_1      | 2022-05-06T03:44:18.353052378Z 2022-05-06 03:44:18,352 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job 6f4d2fb809cd27113aa5a210af958eca: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}]

image

PatrickRen commented 8 months ago

Closing this issue because it was created before version 2.3.0 (2022-11-10). Please try the latest version of Flink CDC to see if the issue has been resolved. If the issue is still valid, kindly report it on Apache Jira under project Flink with component tag Flink CDC. Thank you!