Closed lwd-coding closed 2 weeks ago
kafka已经配置了max.in.flight.requests.per.connection = 1但仍发现canal deploy往kafka发送消息的时候会存在乱序现象,发生乱序的时候存在如下关键日志信息: instance日志:
zk日志:
每个instance都会通过监听各自在zk的临时数据节点runningData来实现HA机制,当出现极端的网络波动或者假死比如频繁full gc等,就会导致zk和客户端的会话无法正常续期,从而出现会话超时导致临时节点删除;当故障恢复此时客户端会触发没有改数据节点事件,改事件会触发ServerRunningMonitor.initRunning initRunning如果抢占runningData成功则会调用CanalMQStarter.startDestination,startDestination的逻辑是先stop再start,由于stop只是设置一个标识位,只有下次轮询才会退出,这时候立马start一个线程就会出现多线程进行get,commit,rollback,存在乱序,丢失数据的可能
乱序场景: send的时候出现并行发送但是ack的时候确是有序进行,此时不会报错,依然满足按最小batchId进行ack
无法正常commit和rollback场景: is not the firstly可能会导致无法ack进而触发事件buffer积压从而无法消费;无法rollback进而丢失数据
抢占成功以后如果mq生产者已经启动就没必要stop再start,跳过就行了。这里重启个人觉得没有啥特殊意义
tks
优化了下stop/start的并发控制,增加stop by latch.wait()的动作,确保正常退出
问题现象:
kafka已经配置了max.in.flight.requests.per.connection = 1但仍发现canal deploy往kafka发送消息的时候会存在乱序现象,发生乱序的时候存在如下关键日志信息: instance日志:
zk日志:
问题分析:
每个instance都会通过监听各自在zk的临时数据节点runningData来实现HA机制,当出现极端的网络波动或者假死比如频繁full gc等,就会导致zk和客户端的会话无法正常续期,从而出现会话超时导致临时节点删除;当故障恢复此时客户端会触发没有改数据节点事件,改事件会触发ServerRunningMonitor.initRunning initRunning如果抢占runningData成功则会调用CanalMQStarter.startDestination,startDestination的逻辑是先stop再start,由于stop只是设置一个标识位,只有下次轮询才会退出,这时候立马start一个线程就会出现多线程进行get,commit,rollback,存在乱序,丢失数据的可能
同个instance出现多线程生产者问题案例:
乱序场景: send的时候出现并行发送但是ack的时候确是有序进行,此时不会报错,依然满足按最小batchId进行ack
无法正常commit和rollback场景: is not the firstly可能会导致无法ack进而触发事件buffer积压从而无法消费;无法rollback进而丢失数据
解决思路:
抢占成功以后如果mq生产者已经启动就没必要stop再start,跳过就行了。这里重启个人觉得没有啥特殊意义