alibaba / canal

阿里巴巴 MySQL binlog 增量订阅&消费组件
Apache License 2.0
28.52k stars 7.62k forks source link

【建表语句同步正常,但是 DML 未同步】MySQL 同实例(IP Port 相同)不同 database 基于 Canal 同步(MySQL source ——> canal.deployer ——> canal.adapter ——> MySQL target) #5039

Open wsm12138 opened 9 months ago

wsm12138 commented 9 months ago

environment

Issue Description

Sysbench ——> MySQL source ——> canal.deployer ——> canal.adapter ——> MySQL target

MySQL 源端建表、插入数据、建索引正常

MySQL 目标端建表、建索引正常、插入数据失败 —— canal.deployer & canal.adapter 无报错

Steps to reproduce

canal.deployer 配置
[root@DB2 wsm]# cat canal.deployer/conf/example/instance.properties  | grep -v ^# | grep -v ^$
canal.instance.gtidon=false
canal.instance.master.address=192.168.10.23:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
canal.instance.tsdb.enable=true
canal.instance.dbUsername=test
canal.instance.dbPassword=test
canal.instance.connectionCharset = UTF-8
canal.instance.enableDruid=false
canal.instance.filter.regex=.*\\..*
canal.instance.filter.black.regex=mysql\\.slave_.*
canal.mq.topic=example
canal.mq.partition=0
canal.instance.multi.stream.on=false
canal.adapter 配置
[root@DB2 canal.adapter]# cat conf/application.yml  | grep -v ^#
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: -1
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:

  #srcDataSources:
  #  defaultDS:
  #    url: jdbc:mysql://192.168.10.23:3306/canal_source?useUnicode=true
  #    username: test
  #    password: test
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: rdb
        key: mysql1
        properties:
          jdbc.driverClassName: com.mysql.jdbc.Driver
          jdbc.url: jdbc:mysql://192.168.10.23:3306/canal_target?useUnicode=true
          jdbc.username: test
          jdbc.password: test
[root@DB2 canal.adapter]# cat conf/rdb/mytest_user.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1 # 需要跟上面保持一致
concurrent: true
dbMapping:
  mirrorDb: true
  # 配置数据库,需要提前创建
  database: canal_source

Expected behaviour

source & target 端,数据一致

Actual behaviour

image

If there is an exception, please attach the exception trace:

deployer log

2024-01-22 16:52:18.144 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2024-01-22 16:52:18.901 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2024-01-22 16:52:18.901 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2024-01-22 16:52:18.906 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2024-01-22 16:52:19.050 [destination = example , address = /192.168.10.23:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2024-01-22 16:52:19.050 [destination = example , address = /192.168.10.23:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2024-01-22 16:52:21.215 [destination = example , address = /192.168.10.23:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000001,position=4,serverId=1,gtid=<null>,timestamp=1705913472000] cost : 2155ms , the next step is binlog dump

adapter log

2024-01-22 16:52:28.189 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============
2024-01-22 16:52:40.524 [pool-8-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":null,"database":"canal_source","destination":"example","es":1705913504000,"groupId":"g1","isDdl":true,"old":null,"pkNames":null,"sql":"CREATE TABLE sbtest1(\n  id INTEGER NOT NULL,\n  k INTEGER DEFAULT '0' NOT NULL,\n  c CHAR(255) DEFAULT '' NOT NULL,\n  pad CHAR(60) DEFAULT '' NOT NULL,\n  PRIMARY KEY (id)\n) /*! ENGINE = innodb */","table":"sbtest1","ts":1705913560325,"type":"CREATE"}
2024-01-22 16:52:40.524 [pool-8-thread-1] DEBUG c.a.o.c.c.adapter.rdb.service.RdbMirrorDbSyncService - DDL: {"data":null,"database":"canal_source","destination":"example","es":1705913504000,"groupId":"g1","isDdl":true,"old":null,"pkNames":null,"sql":"CREATE TABLE sbtest1(\n  id INTEGER NOT NULL,\n  k INTEGER DEFAULT '0' NOT NULL,\n  c CHAR(255) DEFAULT '' NOT NULL,\n  pad CHAR(60) DEFAULT '' NOT NULL,\n  PRIMARY KEY (id)\n) /*! ENGINE = innodb */","table":"sbtest1","ts":1705913560325,"type":"CREATE"}
2024-01-22 16:52:41.068 [pool-8-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":null,"database":"canal_target","destination":"example","es":1705913504000,"groupId":"g1","isDdl":true,"old":null,"pkNames":null,"sql":"CREATE TABLE sbtest1(\n  id INTEGER NOT NULL,\n  k INTEGER DEFAULT '0' NOT NULL,\n  c CHAR(255) DEFAULT '' NOT NULL,\n  pad CHAR(60) DEFAULT '' NOT NULL,\n  PRIMARY KEY (id)\n) /*! ENGINE = innodb */","table":"sbtest1","ts":1705913561068,"type":"CREATE"}
2024-01-22 16:52:46.611 [pool-8-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":1,"k":1,"c":"68487932199-96439406143-93774651418-41631865787-96406072701-20604855487-25459966574-28203206787-41238978918-19503783441","pad":"22195207048-70116052123-74140395089-76317954521-98694025897"}],"database":"canal_source","destination":"example","es":1705913510000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"sbtest1","ts":1705913566599,"type":"INSERT"}
2024-01-22 16:52:46.905 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":1,"k":1,"c":"68487932199-96439406143-93774651418-41631865787-96406072701-20604855487-25459966574-28203206787-41238978918-19503783441","pad":"22195207048-70116052123-74140395089-76317954521-98694025897"},"database":"canal_source","destination":"example","old":null,"table":"sbtest1","type":"INSERT"}
2024-01-22 16:52:49.416 [pool-8-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":null,"database":"canal_source","destination":"example","es":1705913513000,"groupId":"g1","isDdl":true,"old":null,"pkNames":null,"sql":"CREATE INDEX k_1 ON sbtest1(k)","table":"sbtest1","ts":1705913569416,"type":"CINDEX"}
2024-01-22 16:52:49.417 [pool-8-thread-1] DEBUG c.a.o.c.c.adapter.rdb.service.RdbMirrorDbSyncService - DDL: {"data":null,"database":"canal_source","destination":"example","es":1705913513000,"groupId":"g1","isDdl":true,"old":null,"pkNames":null,"sql":"CREATE INDEX k_1 ON sbtest1(k)","table":"sbtest1","ts":1705913569416,"type":"CINDEX"}
2024-01-22 16:52:49.946 [pool-8-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":null,"database":"canal_target","destination":"example","es":1705913513000,"groupId":"g1","isDdl":true,"old":null,"pkNames":null,"sql":"CREATE INDEX k_1 ON sbtest1(k)","table":"sbtest1","ts":1705913569945,"type":"CINDEX"}
2024-01-22 16:53:11.517 [pool-8-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":null,"database":"canal_source","destination":"example","es":1705913535000,"groupId":"g1","isDdl":true,"old":null,"pkNames":null,"sql":"CREATE TABLE t1(id INT auto_increment PRIMARY KEY, name VARCHAR(32))","table":"t1","ts":1705913591517,"type":"CREATE"}
2024-01-22 16:53:11.518 [pool-8-thread-1] DEBUG c.a.o.c.c.adapter.rdb.service.RdbMirrorDbSyncService - DDL: {"data":null,"database":"canal_source","destination":"example","es":1705913535000,"groupId":"g1","isDdl":true,"old":null,"pkNames":null,"sql":"CREATE TABLE t1(id INT auto_increment PRIMARY KEY, name VARCHAR(32))","table":"t1","ts":1705913591517,"type":"CREATE"}
2024-01-22 16:53:12.032 [pool-8-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":null,"database":"canal_target","destination":"example","es":1705913535000,"groupId":"g1","isDdl":true,"old":null,"pkNames":null,"sql":"CREATE TABLE t1(id INT auto_increment PRIMARY KEY, name VARCHAR(32))","table":"t1","ts":1705913592032,"type":"CREATE"}
2024-01-22 16:53:20.058 [pool-8-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":1,"name":"tom"},{"id":2,"name":"jerry"},{"id":3,"name":"jane"}],"database":"canal_source","destination":"example","es":1705913544000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"t1","ts":1705913600057,"type":"INSERT"}
2024-01-22 16:53:20.066 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":1,"name":"tom"},"database":"canal_source","destination":"example","old":null,"table":"t1","type":"INSERT"}
2024-01-22 16:53:20.104 [pool-5-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":3,"name":"jane"},"database":"canal_source","destination":"example","old":null,"table":"t1","type":"INSERT"}
2024-01-22 16:53:20.133 [pool-7-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":2,"name":"jerry"},"database":"canal_source","destination":"example","old":null,"table":"t1","type":"INSERT"}
2024-01-22 16:55:43.224 [http-nio-8081-exec-1] INFO  o.a.catalina.core.ContainerBase.[Tomcat].[localhost].[/] - Initializing Spring DispatcherServlet 'dispatcherServlet'
2024-01-22 16:55:43.225 [http-nio-8081-exec-1] INFO  org.springframework.web.servlet.DispatcherServlet - Initializing Servlet 'dispatcherServlet'
2024-01-22 16:55:43.226 [http-nio-8081-exec-1] INFO  org.springframework.web.servlet.DispatcherServlet - Completed initialization in 1 ms
wsm12138 commented 9 months ago

Hope to get your reply soon, thank you

wsm12138 commented 9 months ago

使用进展更新

验证结论:

  1. MySQL 同实例(IP Port 相同)不同 database 基于 Canal 同步,失败 —— 表现于:DML 未同步,adapter 日志无报错
  2. MySQL 跨实例相同 database name 基于 Canal 同步,成功 —— 具体配置如下
    • adapter
      
      [root@DB2 canal.adapter]# cat conf/application.yml   | grep -v ^#
      server:
      port: 8081
      spring:
      jackson:
      date-format: yyyy-MM-dd HH:mm:ss
      time-zone: GMT+8
      default-property-inclusion: non_null

canal.conf: mode: tcp #tcp kafka rocketMQ rabbitMQ flatMessage: true zookeeperHosts: syncBatchSize: 1000 retries: -1 timeout: accessKey: secretKey: consumerProperties:

canal tcp consumer

canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:

srcDataSources:

defaultDS:

url: jdbc:mysql://192.168.10.23:3306/canal_source?useUnicode=true

username: test

password: test

canalAdapters:

[root@DB2 canal.adapter]# cat conf/rdb/mytest_user.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1 # 需要跟上面保持一致
concurrent: true
dbMapping:
  mirrorDb: true
  # 配置数据库,需要提前创建
  database: canal
[root@DB2 canal.adapter]#

效果

image
egBean commented 8 months ago

你好,请问你最终怎么解决的呢?我就是希望在同实例下不同库名做处理。