xinrong2019 / xinrong2019.github.io

My Blog
https://xinrong2019.github.io
1 stars 1 forks source link

20190508SpringBoot应用是如何注册到注册中心的?(二)注册的时机是什么? #10

Open xinrong2019 opened 5 years ago

xinrong2019 commented 5 years ago

4月29号跟了一部分EnableDiscoveryClient注解的源码,准备研究如何将SpringCloud应用注册到注册中心的,今天继续。

1、调用invokeBeanFactoryPostProcessors方法,在ConfigurationClassPostProcessor中解析、导入配置类

在Spring应用启动的流程中,bean定义加载到beanFactory后,bean初始化前,会调用invokeBeanFactoryPostProcessors方法,该方法在AbstractApplicationContext类的refresh方法中。

这是自动注解配置加载的入口。

/**
     * Instantiate and invoke all registered BeanFactoryPostProcessor beans,
     * respecting explicit order if given.
     * <p>Must be called before singleton instantiation.
     */
    protected void invokeBeanFactoryPostProcessors(ConfigurableListableBeanFactory beanFactory) {
        PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(beanFactory, getBeanFactoryPostProcessors());<1>

        // Detect a LoadTimeWeaver and prepare for weaving, if found in the meantime
        // (e.g. through an @Bean method registered by ConfigurationClassPostProcessor)
        if (beanFactory.getTempClassLoader() == null && beanFactory.containsBean(LOAD_TIME_WEAVER_BEAN_NAME)) {
            beanFactory.addBeanPostProcessor(new LoadTimeWeaverAwareProcessor(beanFactory));
            beanFactory.setTempClassLoader(new ContextTypeMatchClassLoader(beanFactory.getBeanClassLoader()));
        }
    }
<1> 处理AbstractApplicationContext的 post-processor ```java // Do not initialize FactoryBeans here: We need to leave all regular beans // uninitialized to let the bean factory post-processors apply to them! // Separate between BeanDefinitionRegistryPostProcessors that implement // PriorityOrdered, Ordered, and the rest. List currentRegistryProcessors = new ArrayList(); // First, invoke the BeanDefinitionRegistryPostProcessors that implement PriorityOrdered. String[] postProcessorNames = beanFactory.getBeanNamesForType(BeanDefinitionRegistryPostProcessor.class, true, false); for (String ppName : postProcessorNames) { if (beanFactory.isTypeMatch(ppName, PriorityOrdered.class)) {<1> currentRegistryProcessors.add(beanFactory.getBean(ppName, BeanDefinitionRegistryPostProcessor.class)); processedBeans.add(ppName); } } sortPostProcessors(currentRegistryProcessors, beanFactory); registryProcessors.addAll(currentRegistryProcessors); invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);<2> currentRegistryProcessors.clear(); ``` <1> 首先,会调用实现了PriorityOrdered接口的BeanDefinitionRegistryPostProcessors <2> 调用BeanDefinitionRegistryPostProcessors ```java /** * Invoke the given BeanDefinitionRegistryPostProcessor beans. */ private static void invokeBeanDefinitionRegistryPostProcessors( Collection postProcessors, BeanDefinitionRegistry registry) { for (BeanDefinitionRegistryPostProcessor postProcessor : postProcessors) { postProcessor.postProcessBeanDefinitionRegistry(registry); } } ``` 接着会调用ConfigurationClassPostProcessor,就是对配置类处理。 ```java /** * Derive further bean definitions from the configuration classes in the registry. */ @Override public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) { int registryId = System.identityHashCode(registry); if (this.registriesPostProcessed.contains(registryId)) { throw new IllegalStateException( "postProcessBeanDefinitionRegistry already called on this post-processor against " + registry); } if (this.factoriesPostProcessed.contains(registryId)) { throw new IllegalStateException( "postProcessBeanFactory already called on this post-processor against " + registry); } this.registriesPostProcessed.add(registryId); processConfigBeanDefinitions(registry); } ``` ConfigurationClassPostProcessor类的postProcessBeanDefinitionRegistry方法处理配置类的定义,内部调用processConfigBeanDefinitions方法。 接着调用ConfigurationClassParser进行注解解析,解析每一个@Configuration注解的bean。 ```java // Parse each @Configuration class ConfigurationClassParser parser = new ConfigurationClassParser( this.metadataReaderFactory, this.problemReporter, this.environment, this.resourceLoader, this.componentScanBeanNameGenerator, registry); Set candidates = new LinkedHashSet(configCandidates); Set alreadyParsed = new HashSet(configCandidates.size()); do { parser.parse(candidates); parser.validate(); Set configClasses = new LinkedHashSet(parser.getConfigurationClasses()); configClasses.removeAll(alreadyParsed); ``` 在ConfigurationClassParser类的doProcessConfigurationClass中,解析@PropertySource、@ComponentScan、@Import、@ImportResource等注解和@Bean注解的方法。 最后回调用processDeferredImportSelectors()方法 ```java public void parse(Set configCandidates) { this.deferredImportSelectors = new LinkedList(); for (BeanDefinitionHolder holder : configCandidates) { BeanDefinition bd = holder.getBeanDefinition(); try { if (bd instanceof AnnotatedBeanDefinition) { parse(((AnnotatedBeanDefinition) bd).getMetadata(), holder.getBeanName()); } else if (bd instanceof AbstractBeanDefinition && ((AbstractBeanDefinition) bd).hasBeanClass()) { parse(((AbstractBeanDefinition) bd).getBeanClass(), holder.getBeanName()); } else { parse(bd.getBeanClassName(), holder.getBeanName()); } } catch (BeanDefinitionStoreException ex) { throw ex; } catch (Throwable ex) { throw new BeanDefinitionStoreException( "Failed to parse configuration class [" + bd.getBeanClassName() + "]", ex); } } processDeferredImportSelectors(); } ``` 接着调用EnableDiscoveryClientImportSelector的selectImports方法,选择导入的注解配置。 ### 小结: 从invokeBeanFactoryPostProcessors这个入口会进入EnableDiscoveryClientImportSelector类,执行selectImports方法,选择要导入的配置类。 ![image](https://user-images.githubusercontent.com/48017262/57354249-9b405700-719d-11e9-9e72-a553a8908674.png) 导入了两个类: org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration和org.springframework.cloud.consul.discovery.ConsulDiscoveryClientConfiguration ConsulDiscoveryClientConfiguration的导入逻辑在EnableDiscoveryClientImportSelector的父类SpringFactoryImportSelector中。 ```java String[] imports = super.selectImports(metadata); ``` 父类的selectImports方法会调用SpringFactoriesLoader.loadFactoryNames。 ```java // Find all possible auto configuration classes, filtering duplicates List factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader .loadFactoryNames(this.annotationClass, this.beanClassLoader))); ``` SpringFactoriesLoader.loadFactoryNames做了什么? ```java Enumeration urls = (classLoader != null ? classLoader.getResources(FACTORIES_RESOURCE_LOCATION) : ClassLoader.getSystemResources(FACTORIES_RESOURCE_LOCATION)); ``` 从类加载路径上加载配置META-INF/spring.factories。 最后,在spring-cloud-consul-discovery模块下的META-INF/spring.factories文件中,有org.springframework.cloud.client.discovery.EnableDiscoveryClient的实现 ``` org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.consul.discovery.RibbonConsulAutoConfiguration,\ org.springframework.cloud.consul.discovery.configclient.ConsulConfigServerAutoConfiguration,\ org.springframework.cloud.consul.serviceregistry.ConsulAutoServiceRegistrationAutoConfiguration,\ org.springframework.cloud.consul.serviceregistry.ConsulServiceRegistryAutoConfiguration # Discovery Client Configuration org.springframework.cloud.client.discovery.EnableDiscoveryClient=\ org.springframework.cloud.consul.discovery.ConsulDiscoveryClientConfiguration org.springframework.cloud.bootstrap.BootstrapConfiguration=\ org.springframework.cloud.consul.discovery.configclient.ConsulDiscoveryClientConfigServiceBootstrapConfiguration ``` 接着,我在ConsulDiscoveryClientConfiguration这个配置类的consulDiscoveryClient方法打了断点, ```java @Bean @ConditionalOnMissingBean public ConsulDiscoveryClient consulDiscoveryClient(ConsulDiscoveryProperties discoveryProperties, final ApplicationContext context) { ConsulDiscoveryClient discoveryClient = new ConsulDiscoveryClient(consulClient, discoveryProperties, new LifecycleRegistrationResolver(context)); discoveryClient.setServerProperties(serverProperties); //null ok return discoveryClient; } ``` 这里是使用Java注解配置Consul,在SpringCloud里基本都是这么加载配置的,属于最基础的知识点了。 ## 2、调用finishBeanFactoryInitialization实现配置类实例化,实例化consulDiscoveryClient 这里我根据调用栈找了下入口,是AbstractApplicationContext的finishBeanFactoryInitialization方法。 ```java // Instantiate all remaining (non-lazy-init) singletons. finishBeanFactoryInitialization(beanFactory); ``` 这里熟悉Spring的同学都知道,这里是做饿汉单例的初始化工作。 具体的,是实例化一个consulDiscoveryClient类,就是上面ConsulDiscoveryClientConfiguration配置类配置的,会将serverProperties设置到consulDiscoveryClient中,就是配置文件配的内容,比如端口、应用名、servletPath等等。 ## 3、发布容器加载完成事件(EmbeddedServletContainerInitializedEvent),触发注册ConsulServiceRegistry 在容器加载完成,会发布一个内置servlet容器已经初始化了的事件(EmbeddedServletContainerInitializedEvent),然后通过广播,调用所有的监听器,我们这里使用Consul做注册中心,会调用ConsulAutoServiceRegistration这个监听器。 ConsulAutoServiceRegistration类层次结构图 ![image](https://user-images.githubusercontent.com/48017262/57495969-2a1ab400-7303-11e9-8955-7151210bdb10.png) ConsulAutoServiceRegistration继承自AbstractAutoServiceRegistration,AbstractAutoServiceRegistration继承自AbstractDiscoveryLifecycle,AbstractDiscoveryLifecycle实现了好几个接口 ```java @Deprecated public abstract class AbstractDiscoveryLifecycle implements DiscoveryLifecycle, ApplicationContextAware, ApplicationListener ``` 这里可以看到关心的事件是EmbeddedServletContainerInitializedEvent。 一旦触发(或者叫事件发布)EmbeddedServletContainerInitializedEvent,会调用AbstractDiscoveryLifecycle类的onApplicationEvent方法。 ```java @Override @Deprecated public void onApplicationEvent(EmbeddedServletContainerInitializedEvent event) { // TODO: take SSL into account // Don't register the management port as THE port if (!"management".equals(event.getApplicationContext().getNamespace())) { this.port.compareAndSet(0, event.getEmbeddedServletContainer().getPort()); this.start(); } } ``` start方法 ```java // only initialize if nonSecurePort is greater than 0 and it isn't already running // because of containerPortInitializer below if (!this.running.get() && getConfiguredPort() > 0) { register(); if (shouldRegisterManagement()) { registerManagement(); } this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration())); this.running.compareAndSet(false, true); } ``` 只要端口合法,会去调用registry方法,这里基本很清楚了,已经看到了将会去注册到注册中心的代码了。 接着会调用ConsulServiceRegistry的register方法,ConsulServiceRegistry实现了ServiceRegistry接口。这个接口是spring-cloud-commons模块下的接口,抽象了服务注册和注销的行为,传入ConsulRegistration。 ```java public class ConsulRegistration implements Registration { private final NewService service; public ConsulRegistration(NewService service) { this.service = service; } public NewService getService() { return service; } public String getInstanceId() { return getService().getId(); } public String getServiceId() { return getService().getName(); } } ``` ConsulServiceRegistry ```java @Override public void register(ConsulRegistration reg) { log.info("Registering service with consul: " + reg.getService()); try { client.agentServiceRegister(reg.getService(), properties.getAclToken()); if (heartbeatProperties.isEnabled() && ttlScheduler != null) { ttlScheduler.add(reg.getInstanceId()); } } catch (ConsulException e) { if (this.properties.isFailFast()) { log.error("Error registering service with consul: " + reg.getService(), e); ReflectionUtils.rethrowRuntimeException(e); } log.warn("Failfast is false. Error registering service with consul: " + reg.getService(), e); } } ``` 接着调用AgentConsulClient类的agentServiceRegister方法。 ```java @Override public Response agentServiceRegister(NewService newService, String token) { UrlParameters tokenParam = token != null ? new SingleUrlParameters("token", token) : null; String json = GsonFactory.getGson().toJson(newService); RawResponse rawResponse = rawClient.makePutRequest("/v1/agent/service/register", json, tokenParam); if (rawResponse.getStatusCode() == 200) { return new Response(null, rawResponse); } else { throw new OperationException(rawResponse); } } ``` 内部逻辑比较简单,调用consul的注册接口,/v1/agent/service/register,实现注册。 再回到AbstractDiscoveryLifecycle方法 ```java // only initialize if nonSecurePort is greater than 0 and it isn't already running // because of containerPortInitializer below if (!this.running.get() && getConfiguredPort() > 0) { register(); if (shouldRegisterManagement()) { //<1> registerManagement(); } this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));<2> this.running.compareAndSet(false, true);<3> } ``` <1> 如果需要注册管理控制台,就注册一下; <2> 发布InstanceRegisteredEvent事件。 <3> 通过cas切换运行状态。 这其中还有一些其他的细节,我只是跟了主线代码,我这里搜了下上面的<2>,搜了下谁会关注服务实例注册的事件InstanceRegisteredEvent。 有ZuulProxyConfiguration ```java if (event instanceof InstanceRegisteredEvent) { reset(); } ``` 这里猜测一下:网关需要聚合服务,一旦有新的实例从注册中心上下线,需要通知到网关,进行服务提供者信息更新。 reset就是刷新handlerMapping。 另一个搜到的关注InstanceRegisteredEvent事件的是DiscoveryClientHealthIndicator类, ```java @Override public void onApplicationEvent(InstanceRegisteredEvent event) { if (this.discoveryInitialized.compareAndSet(false, true)) { log.debug("Discovery Client has been initialized"); } } ``` 也很好理解,提供者上下线需要更新健康状态。 至此,关于应用是如何注册到注册中心的,已经分析完毕。 ## 总结: 1、调用invokeBeanFactoryPostProcessors方法,在ConfigurationClassPostProcessor中解析、导入配置类 2、调用finishBeanFactoryInitialization实现配置类实例化,实例化consulDiscoveryClient 3、调用finishRefresh发布容器加载完事件,进行服务注册
xinrong2019 commented 5 years ago

自我介绍直接扯最近做的需求,然后扯一个最近看的,比如springboot应用是如何注册到注册中心的。最后问什么基础就说什么基础,复习最近的面试遇到的问题,比如如何保证消息不被重发,如何保证消息的顺序性。

1、最近有一个需求,实现导出进度条展示。

我想了两种方案,整体的思路类似,先查出总数返回给前台,然后后台将数据分成多段,开始多线程查询,每一段查询完返回给前端,前端根据每一段的查询结果显示进度,在内存中汇总数据,最后查完返回给前端。

在内存中汇总会不会满,会不会OOM?

线上数据几十万条,不会过大。

如果很大怎么办?一个文件总共超过几个G,如何实现这个功能?

可以对下载服务单独扩容,横向扩展。事先将总数据量算出来,然后分发到多个机器上处理下载请求,将这些文件逐个返回给客户端。

横向扩展下载服务,如果其中有一台服务挂了怎么办?

服务挂了,需要对下载请求做一个记录,将分发到没台下载服务集群中的任务提及到MQ或者Redis,所有机器下载完成,将MQ或者redis中的任务消费掉或者删除掉。如果有没有成功的,通过定时任务定期从redis中查询下载任务,去对应的机器下载,直到下载完成,或者用户手动取消。

第二种方案,

可以将每一段多线程下载。

2、结合项目讲一下自己对项目的理解,比如不足:存在单点,模块开发不灵活,

这里临时看看翟永超的微服务专题。思考微服务的优点和缺点。

多研究些架构,少谈些框架(1) -- 论微服务架构的核心概念

白话:服务降级与熔断的区别

请不要在“微服务”的狂热中迷失自我!

那些没说出口的研发之痛,做与不做微服务的几大理由

拆分带来的需要解决的技术问题?

分布式事务

如何实现分布式事务?

基于MQ的最终一致性补偿策略

如何保证消费者的消费消息的幂等性?

如何保证生产者的发送消息的可靠性?

如何保证消息的顺序性?

消息队列如何实现高可用?

如何确保消息正确地发送至 RabbitMQ?

如何确保消息接收方消费了消息?

如何避免消息重复投递或重复消费?

什么是死信队列?

RabbitMQ 中的 cluster、mirrored queue,以及 warrens 机制分别用于解决什么问题?

RabbitMQ 是否会弄丢数据?

3、用过哪些锁?

4、SpringCloud如何做负载均衡?

5、项目中如何做权限认证(Spring Security)

6、项目中用过缓存吗?

基于Redis的分布式缓存,整合了Spring Session。

如何用?

了解原理吗?

xinrong2019 commented 5 years ago

Spring、SpringMVC、SpringBoot、Mybatis,选一个整体说一下,要有体系。

MySQL的隔离级别说一下?

为什么用InnoDB引擎?和MyISAM的区别?

支持事务、支持行级锁和表级锁、更高的并发量

查询不加锁。

xinrong2019 commented 5 years ago

数据库索引的原理说一下?

MySql 三大知识点——索引、锁、事务

xinrong2019 commented 5 years ago

Redis的持久化方式说一下?各自优缺点

Redis过期策略说一下?

如果有大量的 key 需要设置同一时间过期,一般需要注意什么?

Redis 使用场景

数据缓存 会话缓存 消息队列 分布式锁

Redis主从复制原理说一下?

Redis集群如何做高可用?

Redis分区说一下?

xinrong2019 commented 5 years ago

代理模式写一下?

命令模式和策略模式的区别?

xinrong2019 commented 5 years ago

Java 提供了哪些阻塞队列的实现?

http://svip.iocoder.cn/Java/Concurrent/Interview/

线程池执行任务的过程说一下?

线程池队列满了,此时提交一个任务,会发生什么?