Closed xiukaiyu closed 6 years ago
应该是kafka的连接参数不对, partition: 设置为0 或者 不填试试
应该是kafka的连接参数不对, partition: 设置为0 或者 不填试试
都试了,一样的错误
看一下kafka的 listeners=PLAINTEXT:// ip是什么, servers: 用这个ip+端口, 不要用127.0.0.1
看一下kafka的 listeners=PLAINTEXT:// ip是什么, servers: 用这个ip+端口, 不要用127.0.0.1
我配置的的listeners=PLAINTEXT://192.168.0.210:9092,也出现了同样的问题
kafka 的yml servers: 192.168.0.210:9092 这样配置试一下
kafka 的yml servers: 192.168.0.210:9092 这样配置试一下
改了并重启kafka及canal,还是同样的错误,更新后的kafka配置如下(kafka和canal在同一台,本机ip是192.168.175.155): cd /data/kafka cp config/server.properties config/server.properties.$(date +%Y%m%d%H%M%S) cat >config/server.properties<<EOF broker.id=0 listeners=PLAINTEXT://192.168.175.155:9092 num.network.threads=2 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dirs=/data/kafka/data num.partitions=2 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1
log.retention.hours=168 log.segment.bytes=536870912 log.retention.check.interval.ms=30000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=30000 group.initial.rebalance.delay.ms=0 EOF
cd /data/canal/ cp conf/example/instance.properties conf/example/instance.properties.$(date +%Y%m%d%H%M%S) cat >conf/example/instance.properties<<EOF canal.instance.mysql.slaveId=175140 canal.instance.gtidon=false canal.instance.master.address=192.168.175.140:3406 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= canal.instance.tsdb.enable=true canal.instance.dbUsername=canal canal.instance.dbPassword=canalpwd canal.instance.connectionCharset=utf-8 canal.instance.defaultDatabaseName=db002 canal.instance.enableDruid=false canal.instance.filter.regex=db002.table002; canal.instance.filter.black.regex= EOF
cp conf/canal.properties conf/canal.properties.$(date +%Y%m%d%H%M%S) sed -i '/canal.id/c canal.id=1' conf/canal.properties sed -i '/canal.ip/c canal.ip=192.168.175.155' conf/canal.properties sed -i '/canal.port/c canal.port=11111' conf/canal.properties sed -i '/canal.metrics.pull.port/c canal.metrics.pull.port=11112' conf/canal.properties sed -i '/canal.zkServers/c canal.zkServers=192.168.175.155:2181' conf/canal.properties sed -i '/canal.withoutNetty/c canal.withoutNetty=true' conf/canal.properties sed -i '/canal.serverMode/c canal.serverMode=kafka' conf/canal.properties sed -i '/canal.destinations/c canal.destinations=example' conf/canal.properties sed -i '/canal.instance.tsdb.spring.xml/c #canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml' conf/canal.properties cat conf/canal.properties | grep -n ^[^#]
cp conf/kafka.yml conf/kafka.yml.$(date +%Y%m%d%H%M%S) cat >conf/kafka.yml<<EOF servers: 192.168.175.155:9092 retries: 0 batchSize: 16384 lingerMs: 1 bufferMemory: 33554432 canalBatchSize: 50 canalGetTimeout: 100 flatMessage: true canalDestinations:
当我把canal版本升级为最新版之后,我也发现这个错误,我降级一下试试
你们用的kafka是什么版本, 可能和canal里的kafka连接器版本不一致
@xiukaiyu @rewerma 已发现错误原因,之前版本的命名可以为kafka.yml 新版本只能命名为mq.yml
@xiukaiyu @rewerma 已发现错误原因,之前版本的命名可以为kafka.yml 新版本只能命名为mq.yml
我去,果然啊,把kafka.yml改成mq.yml就好了。感谢感谢
@rewerma @Jerryd99 感谢两位,撒花
@xiukaiyu @rewerma @KeshawnZhen @Jerryd99 @frew @我也出现这问题,但是找不到你们说的kafka.yml 啊
最新的版本已经去掉了 yml了,全部换成canal.properties和instance.properties,为了支持动态auto scan的能力
我们现在遇到同样的问题,canal版本是1.1.2。 HDP我们配置了plaintext:localhost:6667,同时我检查了Kafka的server.properties,IP都是各主机的IP地址(3个broker),说明配置是正确的。但启动Kafka和canal后在主控的nohup.txt中发现大量报'Connection to node XXX could not be established. Broker may not be available. ',请问大家是如何解决的?
@i-love-doufunao conf/canal.properties 中的 canal.mq.servers = 127.0.0.1:6667 ,要设置成kafka的ip:9092,我的就是,改完就好了
conf/canal.properties 中的 canal.mq.servers = 127.0.0.1:6667 ,要设置成kafka的ip:9092,我的就是,改完就好了
解决了我的问题,谢谢,官方手册都没有提到。又是坑
我也遇到同样问题,大概情况如下: 172.20.10.14 mysql 172.20.10.15 canal(伪装mysql slave) 172.20.10.16 canal-client 接收来自canal 的消息 ,测试没问题而后改成kafka配置 172.20.10.17 kafka, 通过命令行测试收发消息没问题
但是一启动canal sh bin/startup.sh 观察日志就会报非常多的错误 2019-08-04 19:37:07.934 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^...$ 2019-08-04 19:37:07.935 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : 2019-08-04 19:37:07.942 [destination = metrics , address = null , EventParser] ERROR c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - parse events has an error com.alibaba.otter.canal.parse.exception.CanalParseException: illegal connection is null 2019-08-04 19:37:07.945 [main] INFO com.alibaba.otter.canal.deployer.CanalStater - ## the canal server is running now ...... 2019-08-04 19:37:07.963 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify stop metrics successful. 2019-08-04 19:37:09.150 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1, transactionalId=canal-transactional-id] Connection to node 0 could not be established. Broker may not be available. 2019-08-04 19:37:09.150 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1, transactionalId=canal-transactional-id] Connection to node 0 could not be established. Broker may not be available.
我的配置如下: 配置conf/example/instance.properties
cat conf/example/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
## mysql serverId
canal.instance.mysql.slaveId = 200
canal.instance.master.address = 172.20.10.14:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
配置 conf/canal.properties cat conf/canal.properties
#################################################
######### common argument #############
#################################################
#canal.manager.jdbc.url=jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
#canal.manager.jdbc.username=root
#canal.manager.jdbc.password=121212
canal.id = 1
canal.ip = 172.20.10.15
canal.port = 11111
canal.metrics.pull.port = 11112
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode =kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
#################################################
######### destinations #############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = spring
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### MQ #############
##################################################
canal.mq.servers = 172.20.10.17:9092
canal.mq.retries = 2
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# use transaction for kafka flatMessage batch produce
canal.mq.transaction = true
#canal.mq.properties. =
终于知道了,问题不在canal 的配置而是kafka 的配置文件要改 kafka 配置改为如下 vim config/server.properties listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://127.0.0.1:9092
遇到同样问题,解决了半天 @yetHandsome
问题解决了,虽然大家的报错都一样,但是部署情况是不一样的,我的canal和Kafka不在一个网络环境(Kafka在云服务器-公网,canal在内网环境),这样的情况:
kafka配置: vi config/server.properties listeners=PLAINTEXT://0.0.0.0:9092 advertised.listeners=PLAINTEXT://【公网IP】:9092 //这行配置是对外的ip和port
问题就解决了
我这里用的是Kafka,仅供参考。 1、在canal的canal.properties文件中添加一行:canal.mq.servers=ip:9092 2、在kafka的server.properties配置文件中释放:listeners=PLAINTEXT://localhost:9092 3、先重启Kafka,在重启Canal
原因:用1.1.4以后的版本,源码获取[Rocket,Kafka..]MQ配置的逻辑是
/**
* 兼容下<=1.1.4的mq配置项
*/
protected void doMoreCompatibleConvert(String oldKey, String newKey, Properties properties) {
String value = PropertiesUtils.getProperty(properties, oldKey);
if (StringUtils.isNotEmpty(value)) {
properties.setProperty(newKey, value);
}
}
但是新版本的配置文件没有加上老版本的key,导致始终无法获取mq的地址。 源码路径:connector/core/src/main/java/com/alibaba/otter/canal/connector/core/producer/AbstractMQProducer.java
@yetHandsome 这不是默认配置?默认配置不行【kafka v2.8、canal v1.1.5】
问题解决了,虽然大家的报错都一样,但是部署情况是不一样的,我的canal和Kafka不在一个网络环境(Kafka在云服务器-公网,canal在内网环境),这样的情况:
kafka配置: vi config/server.properties listeners=PLAINTEXT://0.0.0.0:9092 advertised.listeners=PLAINTEXT://【公网IP】:9092 //这行配置是对外的ip和port
问题就解决了
老哥🐂,搞了一天
hi 各位大牛,
我在启动canal执行 sh ./bin/startup.sh后,在./log/canal/canal.log里看到大量下面的错误(Connection to node -1 could not be established. Broker may not be available.),然后binlog也没有输出到kafka,麻烦各位大侠看一下,谢谢。 2018-11-14 17:07:09.669 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.175.155:11111] 2018-11-14 17:07:10.601 [main] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)] 2018-11-14 17:07:10.719 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ...... 2018-11-14 17:07:10.800 [destination = example , address = /192.168.175.140:3406 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position {"identity":{"slaveId":-1,"sourceAddress":{"address":"192.168.175.140","port":3406}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000013","position":6642707,"serverId":1001,"timestamp":1541529191000}} 2018-11-14 17:07:10.826 [destination = example , address = /192.168.175.140:3406 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - find start position : EntryPosition[included=false,journalName=mysql-bin.000013,position=6642707,serverId=1001,gtid=,timestamp=1541529191000] 2018-11-14 17:07:11.639 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available. 2018-11-14 17:07:11.639 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available. 2018-11-14 17:07:11.690 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available. 2018-11-14 17:07:11.690 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available. 2018-11-14 17:07:11.792 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available. 2018-11-14 17:07:11.792 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available.
这个问题,我试过这个办法也不行 http://www.voidcn.com/article/p-yorufhta-brs.html
手动推消息给kafka是ok的, echo "delete db002.table002 where id=1;" | /data/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hihihi
/data/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hihihi --from-beginning
下面是kafka.yml配置: cat >conf/kafka.yml<<EOF servers: 127.0.0.1:9092 retries: 0 batchSize: 16384 lingerMs: 1 bufferMemory: 33554432 canalBatchSize: 50 canalGetTimeout: 100 flatMessage: true canalDestinations:
下面是instance.properties配置: cat >conf/example/instance.properties<<EOF canal.instance.mysql.slaveId=175140 canal.instance.gtidon=false canal.instance.master.address=192.168.175.140:3406 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= canal.instance.tsdb.enable=true canal.instance.dbUsername=canal canal.instance.dbPassword=canalpwd canal.instance.connectionCharset=utf-8 canal.instance.defaultDatabaseName=db002 canal.instance.enableDruid=false canal.instance.filter.regex=db002.table002; canal.instance.filter.black.regex= EOF
下面是canal.properties配置: cp conf/canal.properties conf/canal.properties.$(date +%Y%m%d%H%M%S) sed -i '/canal.id/c canal.id=1' conf/canal.properties sed -i '/canal.ip/c canal.ip=127.0.0.1' conf/canal.properties sed -i '/canal.port/c canal.port=11111' conf/canal.properties sed -i '/canal.metrics.pull.port/c canal.metrics.pull.port=11112' conf/canal.properties sed -i '/canal.zkServers/c canal.zkServers=127.0.0.1:2181' conf/canal.properties sed -i '/canal.withoutNetty/c canal.withoutNetty=true' conf/canal.properties sed -i '/canal.serverMode/c canal.serverMode=kafka' conf/canal.properties sed -i '/canal.destinations/c canal.destinations=example' conf/canal.properties sed -i '/canal.instance.tsdb.spring.xml/c #canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml' conf/canal.properties
下面是kafka配置 cat >/data/kafka/config/server.properties<<EOF broker.id=0 listeners=PLAINTEXT://:9092 num.network.threads=2 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dirs=/data/kafka/data num.partitions=2 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168 log.segment.bytes=536870912 log.retention.check.interval.ms=30000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=30000 group.initial.rebalance.delay.ms=0 EOF