alibaba / canal

阿里巴巴 MySQL binlog 增量订阅&消费组件
Apache License 2.0
28.34k stars 7.58k forks source link

k8s跑canal server不能在pod重建后自动删除 #2751

Open zherenyu831 opened 4 years ago

zherenyu831 commented 4 years ago

Question

k8s跑canal server不能在pod重建后自动删除 不知道有什么设置可以设的

784134748 commented 4 years ago

我是使用的statefullset的方式部署,那么pod的访问域名是固定的,然后将域名代替podIp注册到canal-admin中,这样pod重启就不用关心podIp变动的问题了,不知道这个是不是你想要的答案

uddiluke commented 2 years ago

我是使用的statefullset的方式部署,那么pod的访问域名是固定的,然后将域名代替podIp注册到canal-admin中,这样pod重启就不用关心podIp变动的问题了,不知道这个是不是你想要的答案

这样能解决问题(不过重建pod速度还是有损耗).还是希望canal-admin能提供功能,自动删除断线状态的server。

uddiluke commented 2 years ago

我是使用的statefullset的方式部署,那么pod的访问域名是固定的,然后将域名代替podIp注册到canal-admin中,这样pod重启就不用关心podIp变动的问题了,不知道这个是不是你想要的答案

这样能解决问题(不过重建pod速度还是有损耗).还是希望canal-admin能提供功能,自动删除断线状态的server。

做了个job 2秒内检查并删除掉pod。编译后 更新admin的doc image。临时解决

@ConditionalOnProperty(name="enableRemoveDisconnecteServer") @Component public class CheckAndRemoveDisconnecteServerJob { @Autowired NodeServerService nodeServerService; private static final Logger logger = LoggerFactory.getLogger(CheckAndRemoveDisconnecteServerJob.class); private List lastConnectedServers=new ArrayList<>();

/**
 * 2秒检查一次server状态 曾经处于正常状态(1 或0) 然后变成-1的 则进行清理。
 * 启动时已经处于断线状态的(-1) 只能通过手动来处理。
 */
@Scheduled(cron = "0/2 * * * * *")
private void disposeFormalOrderMissingData(){
    try{
        Pager<NodeServer> nodeServerPager=nodeServerService.findList(null,new Pager<NodeServer>(1,100));
        if(nodeServerPager!=null && nodeServerPager.getItems()!=null){
            List<NodeServer> nodeServerList=nodeServerPager.getItems();
            nodeServerList.stream().forEach(nodeServer -> {
                if(lastConnectedServers.contains(nodeServer.getId())
                   && "-1".equals(nodeServer.getStatus())){
                    //已连接节点下线 k8s重启节点 立刻删除
                    nodeServerService.delete(nodeServer.getId());
                    lastConnectedServers.remove(nodeServer.getId());
                    logger.info("delete node "+nodeServer.getIp());
                }else if(!lastConnectedServers.contains(nodeServer.getId())
                        &&  !"-1".equals(nodeServer.getStatus())){
                    //新增节点 且节点处于正常状态 添加
                    lastConnectedServers.add(nodeServer.getId());
                    logger.info("add node "+nodeServer.getIp());
                }else if(!lastConnectedServers.contains(nodeServer.getId()))
                  logger.info("missing node {} {} {}",nodeServer.getIp(),nodeServer.getId(),nodeServer.getStatus());
            });

        }else{
            logger.info("has no node ");
        }
    }catch(Exception e){
        logger.error("disposeFormalOrderMissingData",e);
    }
}

}