alibaba / canal

阿里巴巴 MySQL binlog 增量订阅&消费组件
Apache License 2.0
28.55k stars 7.62k forks source link

canal集成rabbitmq时topic配置问题, rabbitmq中没有收到消息 #5041

Open yuzaoyah opened 10 months ago

yuzaoyah commented 10 months ago

Question

环境: canal 1.1.7 rabbitmq 3.12 mysql 5.7

使用canal集成rabbitmq,出现了服务无法启动的情况,也出现了rabbitmq接收不到消息的情况

操作流程

canal原始配置文件

conf/canal.properties

##################################################
#########           RabbitMQ         #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =

conf/example/instance.properties

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*

根据配置文件填充配置

conf/canal.properties

rabbitmq.host = 192.168.1.211:5673
rabbitmq.virtual.host = /sync_test
rabbitmq.exchange = canal.direct
rabbitmq.username = sync_test
rabbitmq.password = sync_test
rabbitmq.deliveryMode=direct 

conf/example/instance.properties

# mq config
canal.mq.topic=canal.routing_key
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*

启动时报错: 2024-01-26 01:49:54.701 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations 2024-01-26 01:49:54.933 [main] ERROR com.alibaba.otter.canal.deployer.CanalLauncher - ## Something goes wrong when starting up the canal Server: java.lang.NullPointerException: null at com.rabbitmq.client.impl.ChannelN.validateQueueNameLength(ChannelN.java:1606) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:960) at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333) at com.alibaba.otter.canal.connector.rabbitmq.producer.CanalRabbitMQProducer.init(CanalRabbitMQProducer.java:75) at com.alibaba.otter.canal.connector.core.spi.ProxyCanalMQProducer.init(ProxyCanalMQProducer.java:31) at com.alibaba.otter.canal.deployer.CanalStarter.start(CanalStarter.java:72) at com.alibaba.otter.canal.deployer.CanalLauncher.main(CanalLauncher.java:124)

查找issue修改

根据上述报错内容以及参考 #4915 修改配置如下

conf/canal.properties

rabbitmq.host = 192.168.1.211:5673
rabbitmq.virtual.host = /sync_test
rabbitmq.exchange = canal.direct
rabbitmq.username = sync_test
rabbitmq.password = sync_test
rabbitmq.deliveryMode=direct 
rabbitmq.queue = xxx
rabbitmq.routingKey = canal.routing_key

conf/example/instance.properties

# mq config
canal.mq.topic=canal.routing_key
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*

此时收到消息,同样的问题:

  1. 此处为什么需要配置rabbitmq.queue,交换机和路由键唯一确定队列
  2. rabbitmq.routingKey与canal.mq.topic两者区别在哪,如为同一配置,是否多余? 若将rabbitmq.routingKey注释,则会报错,注释canal.mq.topic同理 2024-01-26 01:58:30.823 [main] ERROR com.alibaba.otter.canal.deployer.CanalLauncher - ## Something goes wrong when starting up the canal Server: java.lang.IllegalStateException: Invalid configuration: 'routingKey' must be non-null. at com.rabbitmq.client.impl.AMQImpl$Queue$Bind.(AMQImpl.java:2201) at com.rabbitmq.client.AMQP$Queue$Bind$Builder.build(AMQP.java:870) at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:1076) at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueBind(AutorecoveringChannel.java:401) at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueBind(AutorecoveringChannel.java:396) at com.alibaba.otter.canal.connector.rabbitmq.producer.CanalRabbitMQProducer.init(CanalRabbitMQProducer.java:78) at com.alibaba.otter.canal.connector.core.spi.ProxyCanalMQProducer.init(ProxyCanalMQProducer.java:31) at com.alibaba.otter.canal.deployer.CanalStarter.start(CanalStarter.java:72) at com.alibaba.otter.canal.deployer.CanalLauncher.main(CanalLauncher.java:124)

若上述两个topic配置的内容不同,则rabbitmq收不到消息

agapple commented 9 months ago

可以考虑提交一个PR给我

aitxiaogang commented 2 days ago

遇到相同问题,换成1.1.6版本没问题,希望1.1.7版本能修复这个问题