nacos-group / nacos-sync

Service Sync component
Apache License 2.0
297 stars 144 forks source link

支持Nacos Cluster级别的服务同步,基于Group级别的服务同步增强 #348

Open iYaDongWang opened 8 months ago

iYaDongWang commented 8 months ago

nacos-sync默认是Service级别的服务同步,需要在控制台配置ServiceName和GroupName才可以进行服务同步。 自pr304之后也支持基于Ns-Group级别的服务同步。 image 通过将ServiceName配置为”ALL“以实现Group级别的同步。 这个pr给了我很大的帮助,我在这个基础上做了一些拓展,让nacos-sync支持Cluster级别的服务同步,只需要在新建同步任务的时候将ServiceName和GroupName均配置为”ALL“即可。

主要改动点在com.alibaba.nacossync.timer.CheckRunningStatusAllThread#run

改动前逻辑:根据GroupName查询服务列表,然后进行同步 改动后逻辑:判断GroupName是否为ALL,如果为false,逻辑和之前一致,根据GroupName查询服务列表,然后进行同步; 如果为true,通过增强后的NacosEnhanceNamingService查询集群下所有服务列表,然后进行同步。 对原有功能无影响。


    public void run() {
        Long startTime = System.currentTimeMillis();
        try {
            // 查找所有ServiceName为”ALL“的任务
            List<TaskDO> taskDOS = taskAccessService.findServiceNameIsNull()
                    .stream().filter(t -> t.getStatus() == null || t.getStatus() == 0)
                    .collect(Collectors.toList());
            if (CollectionUtils.isEmpty(taskDOS)) {
                return;
            }
            for (TaskDO taskDO : taskDOS) {
                // 改动点:根据GroupName查询Group下所有服务 =》 如果GroupName为”ALL“,查询集群下所有服务,根据GroupName进行分组;如果不为”ALL“,则逻辑不变。
                Map<String, List<String>> serviceNameListGroupByGroupName = getServiceNameListGroupByGroupName(taskDO);
                if (serviceNameListGroupByGroupName.isEmpty()) {
                    continue;
                }
                for (Map.Entry<String, List<String>> entry : serviceNameListGroupByGroupName.entrySet()) {
                    taskDO.setGroupName(entry.getKey());
                    List<String> serviceNameList = entry.getValue();

                    //如果是null,证明此时没有处理完成
                    List<String> filterService = serviceNameList.stream()
                            .filter(serviceName -> skyWalkerCacheServices.getFinishedTask(taskDO.getTaskId() + serviceName ) == null)
                            .collect(Collectors.toList());

                    if (CollectionUtils.isEmpty(filterService)) {
                        continue;
                    }

                    // 当删除任务后,此时任务的状态为DELETE,不会执行数据同步
                    if (TaskStatusEnum.SYNC.getCode().equals(taskDO.getTaskStatus())) {
                        fastSyncHelper.syncWithThread(taskDO, filterService);
                    }
                }
            }
        }catch (Exception e) {
            log.warn("CheckRunningStatusThread Exception ", e);
        }
        metricsManager.record(MetricsStatisticsType.DISPATCHER_TASK, System.currentTimeMillis() - startTime);
    }

    private Map<String, List<String>> getServiceNameListGroupByGroupName(TaskDO taskDO) throws NacosException {
        Map<String, List<String>> map = new HashMap<>();
        NamingService namingService = nacosServerHolder.get(taskDO.getSourceClusterId());
        if (SkyWalkerConstants.ALL.equals(taskDO.getGroupName())) {
            // 如果serviceName和GroupName都是ALL,查询集群下所有服务
            NacosEnhanceNamingService enhanceNamingService = new NacosEnhanceNamingService(namingService);
            CatalogServiceResult catalogServiceResult = enhanceNamingService.catalogServices(null, null);
            if (catalogServiceResult == null || catalogServiceResult.getCount() <= 0) {
                return map;
            }
            List<ServiceView> serviceList = catalogServiceResult.getServiceList();
            return serviceList.stream()
                    .collect(Collectors.groupingBy(ServiceView::getGroupName, Collectors.mapping(ServiceView::getName, Collectors.toList())));

        }
        // GroupName不为ALL,查询该Group下的所有服务
        ListView<String> servicesOfServer = namingService.getServicesOfServer(0, Integer.MAX_VALUE,
                taskDO.getGroupName());
        map.put(taskDO.getGroupName(), servicesOfServer.getData());
        return map;
    }
iYaDongWang commented 8 months ago

https://github.com/nacos-group/nacos-sync/pull/344