polarismesh / polaris-java-agent

Polaris Java Agent for Spring Cloud and Dubbo
Other
37 stars 22 forks source link

polaris-java-agent设计 #1

Closed andrewshan closed 8 months ago

andrewshan commented 2 years ago

功能架构

image

java-agent作为子节点agent的方式,通过Instrument注入的方式,嵌入到用户的Java进程中,通过拦截具体框架的执行逻辑,对接到北极星SDK,实现服务治理能力。

需要支持的功能包括北极星原生的服务治理能力:服务注册/反注册/心跳、服务发现&负载均衡,动态路由、故障熔断、服务限流

以及高级的场景化能力:无损下线、调用链跟踪、全链路灰度等

运行方式

需要支持2种使用模式:

代码架构

image

andrewshan commented 2 years ago

配置设计

zifeiyu0531 commented 2 years ago

Dubbo插件设计

Dubbo架构说明

官方网址

服务注册与服务发现

sub 图例说明:

服务暴露

服务提供方暴露服务的时序图如下 export

服务调用

在Dubbo项目服务调用方初始化阶段,服务调用方会从注册中心中取出需要调用的服务信息,包括服务对应的ip:port信息等。并将其封装成Invoker对象。初始化阶段的时序图如下 refer

初始化阶段结束后,Dubbo服务调用方发起服务调用的流程如下 invoke

核心模型

Protocol 它是 Invoker 暴露和引用的主功能入口,它负责 Invoker 的生命周期管理。 Invoker 实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。Invoker中包含URL对象,URL中保存了服务的配置信息,包括ip,port等。 Invocation 会话域,它持有调用过程中的变量,比如方法名,参数等。

Dubbo接入Polaris

1. 服务注册

Dubbo服务提供方项目启动时,需要在启动参数中添加namespace和ttl信息,示例如下

java -javaagent:xxx.jar -Dnamespace=Dubbo -Dttl=5 -jar xxx.jar

回顾服务提供方暴露服务的时序图 export2

在ServiceConfig当中,会调用Protocol类的export()方法进行服务暴露,其中export()方法的具体定义如下

<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

可以看到export()方法接收一个Invoker对象作为参数,我们可以在export()方法结束处进行拦截,获取到这个Invoker对象。使用invoker.getUrl()方法可以获取Invoker对象中的url属性。

2. 服务反注册

使用Java提供的Runtime.getRuntime().addShutdownHook()在JVM关闭之前执行服务反注册逻辑

3. 心跳上报

在服务注册之后,启动一个定时线程进行心跳上报

4. 服务发现

整体思路

第一次拦截 回顾服务调用方初始化阶段的时序图 refer2

初始化阶段调用了Protocol类的refer()方法生成Invoker对象,其中refer()方法的具体定义如下

<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

可以看到refer()方法接收URL对象作为其中一个参数,url中包含了服务的配置信息,包括ip,port等。我们可以在refer()方法结束处进行拦截,获取到该方法的返回值Invoker对象。使用一个map<String, Invoker>结构记录ip:portInvoker对象的映射关系

第二次拦截 将上述服务调用方发起服务调用的流程简化成下图 invoke2 观察ClusterInvoker类的构造器

public AbstractClusterInvoker(Directory<T> directory, URL url) {
    if (directory == null) {
        throw new IllegalArgumentException("service directory == null");
    }
    this.directory = directory;
    this.availablecheck = url.getParameter(CLUSTER_AVAILABLE_CHECK_KEY, DEFAULT_CLUSTER_AVAILABLE_CHECK);
}

其中this.directoryDirectory对象,Directory类实现了List<Invoker<T>> list(Invocation invocation)方法,接收一个Invocation对象,返回本次服务调用可用的所有Invoker对象列表 在ClusterInvoker类的构造器结束处进行拦截,将this.directory指定为自己继承的Directory子类,在子类重写list()方法:根据Invocation对象获取service信息,结合namespace使用Polaris的ConsumerApi获取所有instance,根据instance的ip:port信息查询map<String, Invoker>映射关系组装成List<Invoker>返回 consumer1

第三次拦截 Dubbo服务调用方项目启动时,可以在启动参数中添加loadbalance信息,示例如下

java -javaagent:xxx.jar -Dloadbalance=random -jar xxx.jar

观察ClusterInvoker类的invoke()方法,Dubbo项目的每次服务调用都要执行invoke()方法,用于发起请求,获取结果

@Override
public Result invoke(final Invocation invocation) throws RpcException {
    checkWhetherDestroyed();

    Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
    }

    List<Invoker<T>> invokers = list(invocation);
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}

其中List<Invoker<T>> invokers = list(invocation);方法已经在第二次拦截中完成了重写,接下来Dubbo会使用LoadBalance loadbalance = initLoadBalance(invokers, invocation);来获取LoadBalance对象,并在接下来的执行中根据LoadBalance对象中定义的负载均衡策略选取合适的Invoker对象来执行服务调用 观察initLoadBalance(invokers, invocation)方法

protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
    if (CollectionUtils.isNotEmpty(invokers)) {
        return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE));
    } else {
        return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE);
    }
}

可以看到这里是使用ExtensionLoadergetExtension()方法获取的LoadBalance对象 观察getExtension()方法

public T getExtension(String name, boolean wrap) {
    if (StringUtils.isEmpty(name)) {
        throw new IllegalArgumentException("Extension name == null");
    }
    if ("true".equals(name)) {
        return getDefaultExtension();
    }
    final Holder<Object> holder = getOrCreateHolder(name);
    Object instance = holder.get();
    if (instance == null) {
        synchronized (holder) {
            instance = holder.get();
            if (instance == null) {
                instance = createExtension(name, wrap);
                holder.set(instance);
            }
        }
    }
    return (T) instance;
}

这里将对象包装成了Holder类,使用getOrCreateHolder(name)方法返回Holder对象,再通过holder.get()获取到具体对象,如:当name="random"时,get()返回的就是RandomLoadBalance对象 继续观察getOrCreateHolder()方法

private Holder<Object> getOrCreateHolder(String name) {
    Holder<Object> holder = cachedInstances.get(name);
    if (holder == null) {
        cachedInstances.putIfAbsent(name, new Holder<>());
        holder = cachedInstances.get(name);
    }
    return holder;
}

可以看到Holder对象其实是从一个cachedInstances中取出的,cachedInstancesExtensionLoader类的成员变量,类型为ConcurrentMap<String, Holder<Object>>。 因此可以在getOrCreateHolder()方法之前进行拦截 consumer2 通过入参name的值来判断本次调用的目的是否为获取一个LoadBalance对象,若是,则获取启动参数中的loadbalance信息作为name的新值,查看cachedInstances中是否有对应的value且类型为自己定义的LoadBalance,如果不是,则需新实例化一个自己的LoadBalance对象加入cachedInstances,具体的流程如下图所示 flow

Dubbo插件结构

启动参数

参数名 描述 示例 主调方 被调方
namespace 服务所属的命名空间 -Dnamespace=Dubbo
ttl 服务心跳上报间隔 -Dttl=5
loadbalance 负载均衡策略 -Dloadbalance=random
Zyh2333 commented 2 years ago

Spring-Cloud2021插件设计

Agent参数初始化

agent需要大量参数供执行后续所有操作,其中包含用户注入参数及Spring启动参数:

用户启动注入参数

  1. polaris.server.address:host:8091 作为北极星server地址

  2. polaris.namespace:default 作为该服务注册的命名空间

    -Dpolaris.namespace=default -Dpolaris.server.address=host:8091

从Spring上下文获取参数

  1. server.port:作为北极星监听端口

  2. spring.application.name:作为服务名称

  3. spring.cloud.client.ip-address:作为instance的host

拦截Spring启动以获取参数

上述与Spring Web应用相关的参数在Spring启动时都会保存在WebApplicationContext中,故在WebApplicationContext初始化完成后执行拦截。

具体拦截点在SpringApplication类中拦截refreshContext方法,refreshContext方法会在当前Spring Context初始化好后执行:

public ConfigurableApplicationContext run(String... args) {
        long startTime = System.nanoTime();
        DefaultBootstrapContext bootstrapContext = createBootstrapContext();
        ConfigurableApplicationContext context = null;
        configureHeadlessProperty();
        SpringApplicationRunListeners listeners = getRunListeners(args);
        listeners.starting(bootstrapContext, this.mainApplicationClass);
        try {
            ApplicationArguments applicationArguments = new DefaultApplicationArguments(args);
            ConfigurableEnvironment environment = prepareEnvironment(listeners, bootstrapContext, applicationArguments);
            configureIgnoreBeanInfo(environment);
            Banner printedBanner = printBanner(environment);
            context = createApplicationContext();
            context.setApplicationStartup(this.applicationStartup);
            prepareContext(bootstrapContext, context, environment, listeners, applicationArguments, printedBanner);
            // 该处为拦截点!!!
            refreshContext(context);
            afterRefresh(context, applicationArguments);
            Duration timeTakenToStartup = Duration.ofNanos(System.nanoTime() - startTime);
            if (this.logStartupInfo) {
                new StartupInfoLogger(this.mainApplicationClass).logStarted(getApplicationLog(), timeTakenToStartup);
            }
            listeners.started(context, timeTakenToStartup);
            callRunners(context, applicationArguments);
        }
        catch (Throwable ex) {
            handleRunFailure(context, ex, listeners);
            throw new IllegalStateException(ex);
        }
        try {
            Duration timeTakenToReady = Duration.ofNanos(System.nanoTime() - startTime);
            listeners.ready(context, timeTakenToReady);
        }
        catch (Throwable ex) {
            handleRunFailure(context, ex, null);
            throw new IllegalStateException(ex);
        }
        return context;
    }

具体拦截逻辑

该方法只有一个context参数,该参数即为所需的WebApplicationContext对象,在该对象中获取Spring Context的Environment后再按照application.yml中配置的key去获取相应属性即可。

该处须注意所有的Spring Context都要走该流程,所以需要筛选出是否为GenericReactiveWebApplicationContext或GenericWebApplicationContext的实例

初始化好Agent参数后将其保存起来供整个Agent生命周期使用,即PolarisAgentProperties对象。

@Override
    public void beforeInterceptor(Object target, Object[] args, PolarisAgentProperties agentProperties) {
        // check if servlet applicationContext or reactive applicationContext
        Object configurableContext = args[0];
        if (configurableContext instanceof GenericWebApplicationContext || configurableContext instanceof GenericReactiveWebApplicationContext) {

            // log
            LogUtils.logTargetFound(target);

            // convert to applicationContext, actual AnnotationConfigServletWebApplicationContext or AnnotationConfigReactiveWebServerApplicationContext
            ApplicationContext applicationContext = (ApplicationContext) configurableContext;

            // reserve application context for agent
            SpringContextFactory.setApplicationContext(applicationContext);

            // get basic info from applicationContext
            port = applicationContext.getEnvironment().getProperty("server.port");
            service = applicationContext.getEnvironment().getProperty("spring.application.name");
            host = applicationContext.getEnvironment().getProperty("spring.cloud.client.ip-address");
            Assert.notNull(port, "the server port can't be null, please check your server config");
            Assert.notNull(service, "the application name can't be null, please check your spring config");

            logger.info("Polaris service is set with port: {}", port);
            logger.info("Polaris service is set with service: {}", service);
            logger.info("Polaris service is set with host: {}", host);

            // get init info from system
            String host = HostUtils.getHost();
            String namespace = System.getProperty("polaris.namespace");
            String serverAddress = System.getProperty("polaris.server.address");
            Assert.notNull(serverAddress, "the polaris server address can't be null, please check your polaris agent parameter");
            if (StringUtils.isEmpty(namespace)) {
                namespace = "default";
//                logger.warn("the input namespace is empty, use default instead");
                System.out.println("the input namespace is empty, use default instead");
            }

            // init polaris config and reserve
            PolarisAgentProperties polarisAgentProperties = new PolarisAgentProperties();
            polarisAgentProperties.setHost(host);
            polarisAgentProperties.setPort(Integer.valueOf(port));
            polarisAgentProperties.setProtocol("grpc");
            polarisAgentProperties.setNamespace(namespace);
            polarisAgentProperties.setService(service);
            polarisAgentProperties.setServerAddress(serverAddress);
            PolarisAgentPropertiesFactory.setPolarisAgentProperties(polarisAgentProperties);

            // init polarisContext and api
            PolarisContext polarisContext = new PolarisContext(polarisAgentProperties);
            PolarisAPIFactory.init(polarisContext);
        }
    }

服务注册

Spring-cloud服务注册规范为:将服务实例抽象为Registration接口、将服务注册抽象为ServiceRegistry<Registration>接口

整体流程

image

Registration接口

包含ServiceId,Host和Port

public interface ServiceInstance {
    default String getInstanceId() {
        return null;
    }

    String getServiceId();

    String getHost();

    int getPort();

    boolean isSecure();

    URI getUri();

    Map<String, String> getMetadata();

    default String getScheme() {
        return null;
    }
}

ServiceRegistry接口

Spring-cloud规范中完成服务注册动作的类:

包含注册、反注册、状态、关闭等操作接口

public interface ServiceRegistry<R extends Registration> {

    /**
     * Registers the registration. A registration typically has information about an
     * instance, such as its hostname and port.
     * @param registration registration meta data
     */
    void register(R registration);

    /**
     * Deregisters the registration.
     * @param registration registration meta data
     */
    void deregister(R registration);

    /**
     * Closes the ServiceRegistry. This is a lifecycle method.
     */
    void close();

    /**
     * Sets the status of the registration. The status values are determined by the
     * individual implementations.
     * @param registration The registration to update.
     * @param status The status to set.
     * @see org.springframework.cloud.client.serviceregistry.endpoint.ServiceRegistryEndpoint
     */
    void setStatus(R registration, String status);

    /**
     * Gets the status of a particular registration.
     * @param registration The registration to query.
     * @param <T> The type of the status.
     * @return The status of the registration.
     * @see org.springframework.cloud.client.serviceregistry.endpoint.ServiceRegistryEndpoint
     */
    <T> T getStatus(R registration);

}

Polaris服务注册时序图:

image

初始化SDKContext

通过Polaris-Api配置Configuration对象,在默认配置的基础之上绑定SpringCloud决策出的IP,为后续服务注册使用:

    private static Configuration configuration(PolarisAgentProperties polarisAgentProperties) {
        ConfigurationImpl configuration = (ConfigurationImpl) ConfigAPIFactory
                .defaultConfig(ConfigProvider.DEFAULT_CONFIG);
        configuration.setDefault();
        configuration.getGlobal().getAPI().setBindIP(PolarisServiceConstants.host);
        configuration.getGlobal().getServerConnector().setAddresses(Collections.singletonList(polarisAgentProperties.getServerAddress()));
        return configuration;
    }
// 初始化SDKContext
SDKContext.initContextByConfig(configuration());

初始化插件(默认配置可进行扩展)

通过用户传入的agent参数(必须传入server的地址,用以初始化ServerConnector)或者默认SPI方式,初始化Polaris所须的插件(可选),包括:ServerConnector、Router、HealthChecker、LoadBalancer、RateLimiter、CircuitBreaker等

Configuration.Extensions.init()

初始化ConsumerAPI和ProviderAPI

通过SDKContext,初始化Polaris用以服务发现和服务注册的两个API

    public static void init(PolarisContext polarisContext) {
        CONSUMER_API = DiscoveryAPIFactory.createConsumerAPIByContext(polarisContext.getSdkContext());
        PROVIDER_API = DiscoveryAPIFactory.createProviderAPIByContext(polarisContext.getSdkContext());
    }

agent拦截逻辑

拦截点

原逻辑为在WebServer启动完成后会发布WebServerInitializedEvents事件,然后再被监听从而触发服务注册流程,实则可以在WebServer启动完成时,也就是ApplicationContext完成上下文刷新时进行post拦截,如下所示:

在AbstractApplicationContext类中finishRefresh方法进行拦截,该处也须注意所有的Spring Context都要走该流程,所以需要筛选出是否为GenericReactiveWebApplicationContext或GenericWebApplicationContext的实例

public void refresh() throws BeansException, IllegalStateException {
        synchronized(this.startupShutdownMonitor) {
            this.prepareRefresh();
            ConfigurableListableBeanFactory beanFactory = this.obtainFreshBeanFactory();
            this.prepareBeanFactory(beanFactory);

            try {
                this.postProcessBeanFactory(beanFactory);
                this.invokeBeanFactoryPostProcessors(beanFactory);
                this.registerBeanPostProcessors(beanFactory);
                this.initMessageSource();
                this.initApplicationEventMulticaster();
                this.onRefresh();
                this.registerListeners();
                this.finishBeanFactoryInitialization(beanFactory);
                this.finishRefresh();
                // 执行拦截
            } catch (BeansException var9) {
                if (this.logger.isWarnEnabled()) {
                    this.logger.warn("Exception encountered during context initialization - cancelling refresh attempt: " + var9);
                }

                this.destroyBeans();
                this.cancelRefresh(var9);
                throw var9;
            } finally {
                this.resetCommonCaches();
            }

        }
    }
拦截代码

拦截判断:

    @Override
    public void after(Object target, Object[] args, Object result, Throwable throwable) {
        if (target instanceof GenericWebApplicationContext || target instanceof GenericReactiveWebApplicationContext) {
            LogUtils.logTargetFound(target);
            this.afterInterceptor(target, args, result, throwable, PolarisAgentPropertiesFactory.getPolarisAgentProperties());
        }
    }

    @Override
    public void afterInterceptor(Object target, Object[] args, Object result, Throwable throwable, PolarisAgentProperties polarisAgentProperties) {
        AfterPolarisInterceptor polarisInterceptor = new PolarisRegistryPolarisInterceptor();
        polarisInterceptor.afterInterceptor(target, args, result, throwable, polarisAgentProperties);
    }

执行register逻辑:

    @Override
    public void register(Registration registration) {
        if (StringUtils.isEmpty(registration.getServiceId())) {
            log.warn("No service to register for polaris client...");
            return;
        }
        // 注册实例
        InstanceRegisterRequest instanceRegisterRequest = new InstanceRegisterRequest();
        instanceRegisterRequest.setNamespace(polarisContext.getPolarisContextAgentProperties().getNamespace());
        instanceRegisterRequest.setService(registration.getServiceId());
        instanceRegisterRequest.setHost(registration.getHost());
        instanceRegisterRequest.setPort(registration.getPort());
        instanceRegisterRequest.setToken(polarisContext.getPolarisContextAgentProperties().getToken());
        if (null != heartbeatExecutor) {
            instanceRegisterRequest.setTtl(ttl);
        }
//        instanceRegisterRequest.setMetadata(metadataLocalProperties.getContent());
        instanceRegisterRequest.setProtocol(polarisContext.getPolarisContextAgentProperties().getProtocol());
        instanceRegisterRequest.setVersion(polarisContext.getPolarisContextAgentProperties().getVersion());
        try {
            ProviderAPI providerClient = PolarisAPIFactory.getProviderApi();
            providerClient.register(instanceRegisterRequest);
            Runtime.getRuntime().addShutdownHook(new Thread(() -> deregister(registration)));
            if (null != heartbeatExecutor) {
                InstanceHeartbeatRequest heartbeatRequest = new InstanceHeartbeatRequest();
                BeanUtils.copyProperties(instanceRegisterRequest, heartbeatRequest);
                //注册成功后开始启动心跳线程
                heartbeat(heartbeatRequest);
            }
        } catch (Exception e) {
            log.error("polaris registry, {} register failed...{},", registration.getServiceId(), registration, e);
            rethrowRuntimeException(e);
        }
    }

服务反注册

使用Java提供的Runtime.getRuntime().addShutdownHook()在JVM关闭之前执行服务反注册逻辑

    Runtime.getRuntime().addShutdownHook(new Thread(() -> deregister(registration)));
    @Override
    public void deregister(Registration registration) {
        log.info("De-registering from Polaris Server now...");

        if (StringUtils.isEmpty(registration.getServiceId())) {
            log.warn("No dom to de-register for polaris client...");
            return;
        }

        InstanceDeregisterRequest deRegisterRequest = new InstanceDeregisterRequest();
        deRegisterRequest.setToken(polarisContext.getPolarisContextAgentProperties().getToken());
        deRegisterRequest.setNamespace(polarisContext.getPolarisContextAgentProperties().getNamespace());
        deRegisterRequest.setNamespace("default");
        deRegisterRequest.setService(registration.getServiceId());
        deRegisterRequest.setHost(registration.getHost());
        deRegisterRequest.setPort(registration.getPort());

        try {
            ProviderAPI providerClient = PolarisAPIFactory.getProviderApi();
            providerClient.deRegister(deRegisterRequest);
        } catch (Exception e) {
            log.error("ERR_POLARIS_DEREGISTER, de-register failed...{},", registration, e);
        } finally {
            if (null != heartbeatExecutor) {
                heartbeatExecutor.shutdown();
            }
        }
        log.info("De-registration finished.");
    }

服务发现

实现spring-cloud规范中的DiscoveryClient接口,并将其插入到Spring-Cloud服务发现DiscoveryClient列表中

整体流程

image

DiscoveryClient接口

主要包含getInstances和getServices两个方法

public interface DiscoveryClient extends Ordered {

    /**
     * Default order of the discovery client.
     */
    int DEFAULT_ORDER = 0;

    /**
     * A human-readable description of the implementation, used in HealthIndicator.
     * @return The description.
     */
    String description();

    /**
     * Gets all ServiceInstances associated with a particular serviceId.
     * @param serviceId The serviceId to query.
     * @return A List of ServiceInstance.
     */
    List<ServiceInstance> getInstances(String serviceId);

    /**
     * @return All known service IDs.
     */
    List<String> getServices();

    /**
     * Default implementation for getting order of discovery clients.
     * @return order
     */
    @Override
    default int getOrder() {
        return DEFAULT_ORDER;
    }

}

Polaris服务发现时序图

image

拦截逻辑

拦截CompositeDiscoveryClient类的构造函数,将原DiscoveryClient列表清空,再将PolarisDiscoveryClient添加进去,以执行Polaris的服务发现逻辑

public class CompositeDiscoveryClient implements DiscoveryClient {

    private final List<DiscoveryClient> discoveryClients;

    public CompositeDiscoveryClient(List<DiscoveryClient> discoveryClients) {}
        AnnotationAwareOrderComparator.sort(discoveryClients);
        this.discoveryClients = discoveryClients;
           // post拦截点 <init>
           // 替换为PolarisDiscoveryClient
    }
}

服务发现逻辑实现

在PolarisDiscoveryClient中实现具体的调用Polaris的on获取服务及实例的逻辑

   /**
     * 获取服务路由后的实例列表
     *
     * @param service 服务名
     * @return 服务实例列表
     */
    public InstancesResponse getFilteredInstances(String service) {
        String namespace = polarisAgentProperties.getNamespace();
        GetInstancesRequest getInstancesRequest = new GetInstancesRequest();
        getInstancesRequest.setNamespace(namespace);
        getInstancesRequest.setService(service);
        return consumerAPI.getInstances(getInstancesRequest);
    }

 负载均衡

Ribbon

经过调研,尽管在新版本中已经逐渐淘汰Ribbon作为负载均衡,但是Ribbon依然在从Spring Cloud G版H版中被作为默认负载均衡器,并且如果在2020.0及以后版本引入,则依然会使用ribbon

ribbon特点

很多地方不使用Spring-cloud服务注册与发现的规范,而是重新定义接口,例如:

  1. IClientConfig:Client配置接口

  2. IRule:负载均衡策略接口

  3. IPing:状态健康接口

  4. ILoadBalancer:负载均衡策略接口

image

  1. Server:对应Spring-cloud规范中的Instance

  2. ServerList:服务发现规范,将动态刷新进行服务列表维护

image

故实现agent时将首先实现这些规范,调用ribbon时将使用Polaris的路由策略返回ServerList,再由指定的负载均衡策略完成调用,主要的关键是替换掉原有负载均衡器,新建如下所示的负载均衡器,通过Polaris的routerAPI进行路由:

    public PolarisRoutingLoadBalancer(IClientConfig config, IRule rule, IPing ping,
                                      ServerList<Server> serverList, RouterAPI routerAPI,
                                      PolarisAgentProperties polarisAgentProperties) {
        super(config, rule, ping, serverList, null, new PollingServerListUpdater());
        this.routerAPI = routerAPI;
        this.polarisAgentProperties = polarisAgentProperties;
    }
拦截点

Ribbon会在LoadBalancerContext构造函数中初始化负载均衡器,于是在构造函数结束后将this.lb指向新建的PolarisRoutingLoadBalancer即可:

    public LoadBalancerContext(ILoadBalancer lb, IClientConfig clientConfig) {
        this.lb = lb;
        initWithNiwsConfig(clientConfig);      
        // post拦截点  
    }