alibaba / nacos

an easy-to-use dynamic service discovery, configuration and service management platform for building cloud native applications.
https://nacos.io
Apache License 2.0
30.21k stars 12.83k forks source link

是否可以增加一个方法,可以监听任意一个ServiceInfo的变化事件? #2133

Open bugCats opened 4 years ago

bugCats commented 4 years ago

如题: 如果需要监听某个ServiceInfo的变化,必须需要指定它的serviceName。但是,如果需要监听所有的ServiceInfo变化,那么情况就变得有点复杂:先是获取到所有的注册信息,然后循环遍历,添加订阅,还要移除失效的监听。

有个业务场景: nacos作为注册中心,在与Springcloud getaway集成的时候,希望服务在上下线时,gateway执行指定的代码(记录日志、改变路由策略、快速感知服务下线、触发某个事件等)。

希望能增加一个方法,添加监听事件:任意一个ServiceInfo发送了变化时触发,不需要serviceName、clusters限制。由业务层决定该如何处理这个事件。

或者已经有这个功能,麻烦告知一下如何调用。

KeRan213539 commented 4 years ago

我也有类似需求,不过我是自己在gateway实现了定时获取服务列表并判断是否有变化,再更新gateway中的路由...

bugCats commented 4 years ago

我也有类似需求,不过我是自己在网关实现了定时获取服务列表并判断是否有变化,再更新gateway中的路由...

已经找到事件监听的调度类:com.alibaba.nacos.client.naming.core.EventDispatcher。准备进行扩展的,结果发现没法弄,EventDispatcher对象是在com.alibaba.nacos.client.naming.NacosNamingService#init这个私有方法里面创建的╮(╯3╰)╭

使用定时获取,间隔短了浪费性能,间隔长实时性较差。

目前做法是:nacos也作为配置中心使用,gateway订阅一个配置文件的变化。 服务上下线时,服务修改这个配置文件。然后gateway收到配置文件变化时,再主动获取服务列表,对比上次获取的服务列表,添加新监听或者移除失效的监听,再更新gateway路由... (dog.jpg

KeRan213539 commented 4 years ago

类似你目前做法的我之前也搞过,直接在nacos配制里放一个路由结构的JSON,通过监听这个JSON变化来改变本地路由.这样也很麻烦. 之前我也考虑过在nacos中增加推送的功能,简单研究了一下不太好实现,最后图方便还是放弃了 nacos准备增加gRpc的通信方式了,到时候再看看好不好增加这个推送,我现在先用轮询方式过渡一下.

xbcrh commented 4 years ago

路由结构的json,如果服务多的话要配置好多路由,,,并且轮询的话感觉还是不好。还是监听某个配置文件吧,有服务上线只需要改下配置文件就可以。然后获取服务列表,重新更新gateway路由。但我发现获取服务列表时怎么把我已下线的服务也获取到了,还得一个个查询该服务有没有健康实例,不知道有没有好的方法能直接获取线上的服务

KeRan213539 commented 4 years ago

还是nacos里能增加监听最好,看看有没有谁会把这个ISSUE搞了...

bugCats commented 4 years ago

@maketubo 我看了一下,如果修改HostReactor类,这样的话,EventListener里面的内容,会阻塞扫描线程。

其实所有的EventListener触发,都在这个内部类里面完成com.alibaba.nacos.client.naming.core.EventDispatcher.Notifier, 从这个类着手修改。

以下是我修改的思路:

EventDispatcher:添加移除监听;再在NamingService暴露新增的2个方法:

private ConcurrentMap<String, List<EventListener>> globalMap = new ConcurrentHashMap<String, List<EventListener>>();
public void addListener(String clusters, EventListener listener){
    NAMING_LOGGER.info("[LISTENER] adding * with " + clusters + " to listener map");
    List<EventListener> observers = Collections.synchronizedList(new ArrayList<EventListener>());
    observers.add(listener);
    observers = globalMap.putIfAbsent(clusters, observers);
    if (observers != null) {
        observers.add(listener);
    }
}
public void removeListener(String clusters, EventListener listener){
    NAMING_LOGGER.info("[LISTENER] removing * with " + clusters + " from listener map");

    List<EventListener> observers = globalMap.get(clusters);
    if (observers != null) {
        Iterator<EventListener> iter = observers.iterator();
        while (iter.hasNext()) {
            EventListener oldListener = iter.next();
            if (oldListener.equals(listener)) {
                iter.remove();
            }
        }
        if (observers.isEmpty()) {
            observerMap.remove(clusters);
        }
    }
}

Notifier

private class Notifier implements Runnable {
    @Override
    public void run() {
        while (true) {
            ServiceInfo serviceInfo = null;
            try {
                serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);
            } catch (Exception ignore) {
            }

            if (serviceInfo == null) {
                continue;
            }

            try {

           //任意一个服务发生变化,触发事件
                List<EventListener> globalListeners = globalMap.get(serviceInfo.getClusters());
                if (!CollectionUtils.isEmpty(globalListeners)) {
                    for (EventListener listener : globalListeners) {
                        List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());
                        listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), hosts));
                    }
                }

                List<EventListener> listeners = observerMap.get(serviceInfo.getKey());

                if (!CollectionUtils.isEmpty(listeners)) {
                    for (EventListener listener : listeners) {
                        List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());
                        listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), hosts));
                    }
                }

            } catch (Exception e) {
                NAMING_LOGGER.error("[NA] notify error for service: "
                    + serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e);
            }
        }
    }
}
2YSP commented 1 year ago

我也有类似需求,不过我是自己在网关实现了定时获取服务列表并判断是否有变化,再更新gateway中的路由...

已经找到事件监听的调度类:com.alibaba.nacos.client.naming.core.EventDispatcher。准备进行扩展的,结果发现没法弄,EventDispatcher对象是在com.alibaba.nacos.client.naming.NacosNamingService#init这个私有方法里面创建的╮(╯3╰)╭

使用定时获取,间隔短了浪费性能,间隔长实时性较差。

目前做法是:nacos也作为配置中心使用,gateway订阅一个配置文件的变化。 服务上下线时,服务修改这个配置文件。然后gateway收到配置文件变化时,再主动获取服务列表,对比上次获取的服务列表,添加新监听或者移除失效的监听,再更新gateway路由... (dog.jpg

@maketubo 我看了一下,如果修改HostReactor类,这样的话,EventListener里面的内容,会阻塞扫描线程。

其实所有的EventListener触发,都在这个内部类里面完成com.alibaba.nacos.client.naming.core.EventDispatcher.Notifier, 从这个类着手修改。

以下是我修改的思路:

EventDispatcher:添加移除监听;再在NamingService暴露新增的2个方法:

private ConcurrentMap<String, List<EventListener>> globalMap = new ConcurrentHashMap<String, List<EventListener>>();
public void addListener(String clusters, EventListener listener){
    NAMING_LOGGER.info("[LISTENER] adding * with " + clusters + " to listener map");
    List<EventListener> observers = Collections.synchronizedList(new ArrayList<EventListener>());
    observers.add(listener);
    observers = globalMap.putIfAbsent(clusters, observers);
    if (observers != null) {
        observers.add(listener);
    }
}
public void removeListener(String clusters, EventListener listener){
    NAMING_LOGGER.info("[LISTENER] removing * with " + clusters + " from listener map");

    List<EventListener> observers = globalMap.get(clusters);
    if (observers != null) {
        Iterator<EventListener> iter = observers.iterator();
        while (iter.hasNext()) {
            EventListener oldListener = iter.next();
            if (oldListener.equals(listener)) {
                iter.remove();
            }
        }
        if (observers.isEmpty()) {
            observerMap.remove(clusters);
        }
    }
}

Notifier

private class Notifier implements Runnable {
    @Override
    public void run() {
        while (true) {
            ServiceInfo serviceInfo = null;
            try {
                serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);
            } catch (Exception ignore) {
            }

            if (serviceInfo == null) {
                continue;
            }

            try {

           //任意一个服务发生变化,触发事件
                List<EventListener> globalListeners = globalMap.get(serviceInfo.getClusters());
                if (!CollectionUtils.isEmpty(globalListeners)) {
                    for (EventListener listener : globalListeners) {
                        List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());
                        listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), hosts));
                    }
                }

                List<EventListener> listeners = observerMap.get(serviceInfo.getKey());

                if (!CollectionUtils.isEmpty(listeners)) {
                    for (EventListener listener : listeners) {
                        List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());
                        listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), hosts));
                    }
                }

            } catch (Exception e) {
                NAMING_LOGGER.error("[NA] notify error for service: "
                    + serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e);
            }
        }
    }
}

我也有类似需求,不过我是自己在网关实现了定时获取服务列表并判断是否有变化,再更新gateway中的路由...

已经找到事件监听的调度类:com.alibaba.nacos.client.naming.core.EventDispatcher。准备进行扩展的,结果发现没法弄,EventDispatcher对象是在com.alibaba.nacos.client.naming.NacosNamingService#init这个私有方法里面创建的╮(╯3╰)╭

使用定时获取,间隔短了浪费性能,间隔长实时性较差。

目前做法是:nacos也作为配置中心使用,gateway订阅一个配置文件的变化。 服务上下线时,服务修改这个配置文件。然后gateway收到配置文件变化时,再主动获取服务列表,对比上次获取的服务列表,添加新监听或者移除失效的监听,再更新gateway路由... (dog.jpg

你这个想法跟我之前写自己网关时的想法一样啊,当时网关集群化需要感知服务配置的修改(如权重修改),我也是订阅了一个配置变更实现的

KomachiSion commented 6 months ago

Will be solved by #10380