alibaba / canal

阿里巴巴 MySQL binlog 增量订阅&消费组件
Apache License 2.0
28.34k stars 7.59k forks source link

使用canal监听一个数据库一张表发生MQ消息过大问题 #3779

Open YouAnCao opened 3 years ago

YouAnCao commented 3 years ago

Question

binlog format/image check

canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

binlog ddl isolation

canal.instance.get.ddl.isolation = false

* example/example.properties
```properties
canal.instance.filter.regex=db\\.t_vehicle
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=oms-canal-default-dev
# dynamic topic route by schema or table regex
canal.mq.dynamicTopic=oms-vehicle-notify-dev:db\\.t_vehicle
canal.mq.partition=0

canal 错误日志:

2021-09-09 17:04:02.578 [pool-4-thread-1] ERROR c.a.o.canal.connector.kafka.producer.CanalKafkaProducer - java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 5484200 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 5484200 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
        at com.alibaba.otter.canal.connector.kafka.producer.CanalKafkaProducer.send(CanalKafkaProducer.java:184) ~[na:na]
        at com.alibaba.otter.canal.server.CanalMQStarter.worker(CanalMQStarter.java:181) [canal.server-1.1.5.jar:na]
        at com.alibaba.otter.canal.server.CanalMQStarter.access$100(CanalMQStarter.java:25) [canal.server-1.1.5.jar:na]
        at com.alibaba.otter.canal.server.CanalMQStarter$CanalMQRunnable.run(CanalMQStarter.java:223) [canal.server-1.1.5.jar:na]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_172]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_172]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_172]
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 5484200 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
        at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1150) ~[na:na]
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:846) ~[na:na]
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:784) ~[na:na]
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:671) ~[na:na]
        at com.alibaba.otter.canal.connector.kafka.producer.CanalKafkaProducer.produce(CanalKafkaProducer.java:268) ~[na:na]
        at com.alibaba.otter.canal.connector.kafka.producer.CanalKafkaProducer.send(CanalKafkaProducer.java:261) ~[na:na]
        at com.alibaba.otter.canal.connector.kafka.producer.CanalKafkaProducer.lambda$send$0(CanalKafkaProducer.java:156) ~[na:na]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_172]
        ... 3 common frames omitted
YouAnCao commented 3 years ago

请教一下,为什么对一张表的订阅会发生超过最大MQ的消息体消息

nielizixiu commented 3 years ago

检查下是不是mq没连上或不通, 我之前在使用kafka模式出现过这个问题

YouAnCao commented 3 years ago

检查下是不是mq没连上或不通, 我之前在使用kafka模式出现过这个问题

no, kafka是通的,只是运行一段时间后,就会必现

dingyufei615 commented 2 years ago

表里有大字段 调大点kafka producer发送数据的batch size 参考我目前在用的

#kafka 集群地址
canal.mq.servers = xxxxx
#消息发送失败重试次数 默认0
canal.mq.retries = 1
## flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限 10240000
canal.mq.batchSize = 81920
canal.mq.maxRequestSize = 163840000
# kafka为ProducerConfig.LINGER_MS_CONFIG , 如果是flatMessage格式建议将该值调大,50~200
canal.mq.lingerMs = 500
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 150
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 200
canal.mq.parallelThreadSize = 16
# 是否为flat json格式对象
canal.mq.flatMessage = true
canal.mq.compressionType = none
# kafka消息投递是否使用事务
canal.mq.acks = 1

#kafka无意义 rocketMQ为ProducerGroup名
canal.mq.producerGroup = test
Felix0829 commented 2 years ago

这个字段过大报错可以将当前的msg打印出来吗,以此判断是哪个库表的