alibaba / canal

阿里巴巴 MySQL binlog 增量订阅&消费组件
Apache License 2.0
28.47k stars 7.61k forks source link

canal-adapter从kafka消费数据一定要配置源数据库信息吗 #5314

Open hcb1987 opened 2 days ago

hcb1987 commented 2 days ago

Question

我有一个需求就是两个不同网络的mysql需要同步数据,如果要做主从就需要把mysql端口映射到外网,我不想把数据库映射出来,想通过mysql同步至canal-deployer,canal-deployer同步kafka,canal-adapter消费同步至另外一台mysql这种方式同步,canal-adapter部署再从机mysql网络,和源MySQL网络不通,canal-adapter一定要配置源数据库的连接信息吗? 我的配置如下,这样不能同步吗 `server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null

canal.conf: mode: kafka #tcp kafka rocketMQ rabbitMQ flatMessage: true zookeeperHosts: syncBatchSize: 1000 retries: -1 timeout: accessKey: secretKey: consumerProperties:

canal tcp consumer

canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 192.168.126.129:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 192.168.126.129:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:

canalAdapters:

启动后一直有一个告警 2024-10-30 07:37:15.011 [Thread-3] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=c9f4a0, groupId=g1] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. 2024-10-30 07:37:15.985 [Thread-3] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=c9f4a0, groupId=g1] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. 2024-10-30 07:37:16.933 [Thread-3] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=c9f4a0, groupId=g1] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. 我配置了远端的kafka地址怎么还要读本地的kafka呢,这个要怎么配置

callmedba commented 2 days ago

试试在 canal-deployer或者kafka资源所在节点部署一个 otter 是不是就可以干这个事情 ?

https://github.com/alibaba/otter

hcb1987 commented 2 days ago

之前kafka的报错解决了 是因为kafka的监听地址 没有配置 ,配置了ip地址之后可以连接了,但是现在报了个新的错就是再canal-adapter同步数据的时候报错如下: 2024-10-30 12:13:04.238 [Thread-3] INFO o.a.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=6a4270, groupId=g1] Successfully joined group with generation 4 2024-10-30 12:13:04.242 [Thread-3] INFO o.a.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=6a4270, groupId=g1] Adding newly assigned partitions: example-0 2024-10-30 12:13:04.264 [Thread-3] INFO o.a.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=6a4270, groupId=g1] Setting offset for partition example-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=192.168.126.129:9092 (id: 0 rack: null), epoch=-1}} 2024-10-30 12:13:05.057 [Thread-3] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=6a4270, groupId=g1] Seeking to offset 1 for partition example-0 2024-10-30 12:13:05.077 [Thread-3] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - illegal fieldName input, offset 1, character , line 1, column 2, fastjson-version 2.0.31 6 binlog.000005�� UTF-80�ٽ�28BJPK �j binlog.000005�� UTF-80�ٽ�28BJt1PGX3Z/insert into t1(id) values (1),(2),(3),(34)� binlog.000005�� UTF-80�ٽ�28BhucbJt1P7Xb rowsCount4�Pid (0B1Rintid (0B2Rintid (0B3Rintid (0B34Rint9 binlog.000005�� UTF-80�ٽ�28BJP4410 Error sync and rollback, execute times: 1 XshellXshellXshellXshell2024-10-30 12:13:05.600 [Thread-3] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=6a4270, groupId=g1] Seeking to offset 1 for partition example-0 2024-10-30 12:13:05.621 [Thread-3] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - illegal fieldName input, offset 1, character , line 1, column 2, fastjson-version 2.0.31 6 binlog.000005�� UTF-80�ٽ�28BJPK �j binlog.000005�� UTF-80�ٽ�28BJt1PGX3Z/insert into t1(id) values (1),(2),(3),(34)� binlog.000005�� UTF-80�ٽ�28BhucbJt1P7Xb rowsCount4�Pid (0B1Rintid (0B2Rintid (0B3Rintid (0B34Rint9 binlog.000005�� UTF-80�ٽ�28BJP4410 Error sync and rollback, execute times: 2 XshellXshellXshellXshell2024-10-30 12:13:06.130 [Thread-3] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=6a4270, groupId=g1] Seeking to offset 1 for partition example-0 2024-10-30 12:13:06.132 [Thread-3] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - illegal fieldName input, offset 1, character , line 1, column 2, fastjson-version 2.0.31 6 binlog.000005�� UTF-80�ٽ�28BJPK �j binlog.000005�� UTF-80�ٽ�28BJt1PGX3Z/insert into t1(id) values (1),(2),(3),(34)� binlog.000005�� UTF-80�ٽ�28BhucbJt1P7Xb rowsCount4�Pid (0B1Rintid (0B2Rintid (0B3Rintid (0B34Rint9 binlog.000005�� UTF-80�ٽ�28BJP4410 Error sync and rollback, execute times: 3 XshellXshellXshellXshell2024-10-30 12:13:06.652 [Thread-3] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=6a4270, groupId=g1] Seeking to offset 1 for partition example-0 2024-10-30 12:13:06.656 [Thread-3] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - illegal fieldName input, offset 1, character , line 1, column 2, fastjson-version 2.0.31 6 binlog.000005�� UTF-80�ٽ�28BJPK �j binlog.000005�� UTF-80�ٽ�28BJt1PGX3Z/insert into t1(id) values (1),(2),(3),(34)� binlog.000005�� UTF-80�ٽ�28BhucbJt1P7Xb rowsCount4�Pid (0B1Rintid (0B2Rintid (0B3Rintid (0B34Rint9 binlog.000005�� UTF-80�ٽ�28BJP4410 Error sync and rollback, execute times: 4

hcb1987 commented 2 days ago

@callmedba otter解决不了这个问题吧 两个网络之间只有kafka是通的 只能通过kafka去传输数据,otter不能读取kafka的数据吧

hcb1987 commented 2 days ago

rdb配置文件如下:

Mirror schema synchronize config

dataSourceKey: defaultDS destination: example groupId: g1 outerAdapterKey: mysql1 concurrent: true dbMapping: mirrorDb: true database: hucb