在上篇文章spring cloud 服务发现之Eureka Client(二)—自动装配中,介绍了Eureka Client自动装配的过程,其中设计到几个比较重要的类,包括ServiceRegistry, EurekaClient, EurekaDiscoveryClient类型。这些类型在Eureka Client整个生命周期中充当这比较重要的角色,在今天这章节中,将主要介绍DiscoveryClient对象,该对象主要包括了一下几个步骤:
- 注册当前实例
- 从eureka server中获取实例列表
- 心跳启动
类结构设计
在Spring Cloud中,使用了EurekaDiscoveryClient作为DiscoveryClient的实现,该类型的设计如下:
spring cloud在做封装的时候,是提取了单独一个DicoveryClient类,作为通用的DicoveryClient客户端来处理。这里在看源码的时候,会产生一个误区,需要多加注意。通过上面类图可以看出,spring cloud在原有的netflix的DicoveryClient基础之上,封装了CloudEurekaClient客户端,用于屏蔽对DicoveryClient使用的细节。
实例化CloudEurekaClient
对EurekaClient的使用,首先需要对EurekaClient的实例化,实例化通过CloudEurekaClient()构造函数进行,源码如下:
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
ApplicationEventPublisher publisher) {
super(applicationInfoManager, config, args);
this.applicationInfoManager = applicationInfoManager;
this.publisher = publisher;
this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
"eurekaTransport");
ReflectionUtils.makeAccessible(this.eurekaTransportField);
}
该构造函数声明了当前类型的依赖关系,需要创建:
ApplicationInfoManager对象EurekaClientConfig对象,在spring中实际使用的EurekaClientConfigBean对象AbstractDiscoveryOptionalArgs对象,该对象的最终使用的是MutableDiscoveryClientOptionalArgs对象ApplicationEventPublisher对象,该对象由AbstractApplicationContext对象创建,并加入到容器
DiscoveryClient实例化
通过对CloudEurekaClient实例化过程中,会实例化父类DicoveryClient对象,具体源码如下:
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) {
this(applicationInfoManager, config, args, ResolverUtils::randomize);
}
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) {
this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {
private volatile BackupRegistry backupRegistryInstance;
@Override
public synchronized BackupRegistry get() {
if (backupRegistryInstance == null) {
String backupRegistryClassName = config.getBackupRegistryImpl();
if (null != backupRegistryClassName) {
try {
backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance();
logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass());
} catch (InstantiationException e) {
logger.error("Error instantiating BackupRegistry.", e);
} catch (IllegalAccessException e) {
logger.error("Error instantiating BackupRegistry.", e);
} catch (ClassNotFoundException e) {
logger.error("Error instantiating BackupRegistry.", e);
}
}
if (backupRegistryInstance == null) {
logger.warn("Using default backup registry implementation which does not do anything.");
backupRegistryInstance = new NotImplementedRegistryImpl();
}
}
return backupRegistryInstance;
}
}, randomizer);
}
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
// 判断args参数是否为空, 在spring cloud环境中,使用的是MutableDiscoveryClientOptionalArgs对象
if (args != null) {
// 健康检查处理类
this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
// 健康检查回调
this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
// event监听事件列表
this.eventListeners.addAll(args.getEventListeners());
// 实例注册前置处理器
this.preRegistrationHandler = args.preRegistrationHandler;
} else {
this.healthCheckCallbackProvider = null;
this.healthCheckHandlerProvider = null;
this.preRegistrationHandler = null;
}
this.applicationInfoManager = applicationInfoManager;
// 获取当前实例InstanceInfo信息
InstanceInfo myInfo = applicationInfoManager.getInfo();
clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if (myInfo != null) {
appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
} else {
logger.warn("Setting instanceInfo to a passed in null value");
}
this.backupRegistryProvider = backupRegistryProvider;
this.endpointRandomizer = endpointRandomizer;
this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
// 本地region applications缓存
localRegionApps.set(new Applications());
fetchRegistryGeneration = new AtomicLong(0);
// 获取当前需要fetch的regions列表, 通过eureka.client.featch-remote-regions-registry
remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
// 多个fetch regsion采用,分割,在这里将会通过split的方式获取一个数组
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
// 判断当前EurekaClient是否需要从eureka server 获取注册的实例列表, 通过eureka.client.fetch-registry配置,默认值为true
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
// 判断当前EurekaClient需要将当前实例注册到Eureka Server, 通过eureka.client.register-with-eureka 配置,默认值为true
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
// 该条件用于判断是否需要将当前实例注册到eureka server服务,并且是否需要从eureka server上获取实例列表
// 当我们在启动Eureka Server的时候, 这两项我们都设置为false
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
logger.info("Client configured to neither register nor query for data.");
scheduler = null;
heartbeatExecutor = null;
cacheRefreshExecutor = null;
eurekaTransport = null;
// 实例regsion checker对象
instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
// DiscoveryManager是一个单例,用于保存当前EurekaClient对象,以及与Client相关的Config信息
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
return; // no need to setup up an network tasks and we are done
}
try {
// default size of 2 - 1 each for heartbeat and cacheRefresh
// 当Client 中配置设置为 eureka.client.register-with-eureka 或者 eureka.client.fetch-registry 为true时,就会初始化定时任务
// scheduler为线程池, 主要为DicoveryClient相关, 并且在线程池中的所有线程都是为守护线程,当主线程stop后,对应的任务前程也将会退出
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
// 初始化DicoveryClient Heartbeat线程池, 线程线程为1
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// 缓存刷新线程池初始化,用于处理本地实例刷新操作。
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
eurekaTransport = new EurekaTransport();
// 该方法主要初始化EurekaHttpClients对象
scheduleServerEndpointTask(eurekaTransport, args);
AzToRegionMapper azToRegionMapper;
// 判断当前eureka是否通过dns的方式从EurekaServer同步实例列表, 通过eureka.client.use-dns-for-featching-service-urls配置,默认值为false
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
// instance regsion checker对象,同步实例的regsion信息
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
// 判断是否需要从eureka server同步实例信息, 如果需要,则从eureka server同步,并保存applications信息到本地
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
// call and execute the pre registration handler before all background tasks (inc registration) is started
if (this.preRegistrationHandler != null) {
this.preRegistrationHandler.beforeRegistration();
}
// 当前有两个判断条件, 第一个为当前实例是否需要注册实例信息到eureka server
// 第二个为在初始化过程中是否需要注册, 通过 eureka.client.should-enforce-registration-at-init配置,默认为false
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
// 注册实例信息
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
// 启动定时任务, 包括了刷新缓存,心跳信息等
initScheduledTasks();
try {
// 暴露监控信息
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
// 保存全局EurekaClient对象
DiscoveryManager.getInstance().setDiscoveryClient(this);
// 保存全局的eureka client config对象
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
}
scheduleServerEndpointTask()
private void scheduleServerEndpointTask(EurekaTransport eurekaTransport,
AbstractDiscoveryClientOptionalArgs args) {
// 通过args获取filter列表
Collection<?> additionalFilters = args == null
? Collections.emptyList()
: args.additionalFilters;
// 获取eurekaJerseyClient对象
EurekaJerseyClient providedJerseyClient = args == null
? null
: args.eurekaJerseyClient;
TransportClientFactories argsTransportClientFactories = null;
if (args != null && args.getTransportClientFactories() != null) {
argsTransportClientFactories = args.getTransportClientFactories();
}
// Ignore the raw types warnings since the client filter interface changed between jersey 1/2
@SuppressWarnings("rawtypes")
TransportClientFactories transportClientFactories = argsTransportClientFactories == null
? new Jersey1TransportClientFactories()
: argsTransportClientFactories;
// 是否使用ssl context
Optional<SSLContext> sslContext = args == null
? Optional.empty()
: args.getSSLContext();
// host name 校验器
Optional<HostnameVerifier> hostnameVerifier = args == null
? Optional.empty()
: args.getHostnameVerifier();
// If the transport factory was not supplied with args, assume they are using jersey 1 for passivity
// transport client 工厂对象获取
eurekaTransport.transportClientFactory = providedJerseyClient == null
? transportClientFactories.newTransportClientFactory(clientConfig, additionalFilters, applicationInfoManager.getInfo(), sslContext, hostnameVerifier)
: transportClientFactories.newTransportClientFactory(additionalFilters, providedJerseyClient);
// application source 对象创建
ApplicationsResolver.ApplicationsSource applicationsSource = new ApplicationsResolver.ApplicationsSource() {
@Override
public Applications getApplications(int stalenessThreshold, TimeUnit timeUnit) {
long thresholdInMs = TimeUnit.MILLISECONDS.convert(stalenessThreshold, timeUnit);
long delay = getLastSuccessfulRegistryFetchTimePeriod();
if (delay > thresholdInMs) {
logger.info("Local registry is too stale for local lookup. Threshold:{}, actual:{}",
thresholdInMs, delay);
return null;
} else {
return localRegionApps.get();
}
}
};
// 通过EurekaHttpClients创建bootstrapResolver对象
eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver(
clientConfig,
transportConfig,
eurekaTransport.transportClientFactory,
applicationInfoManager.getInfo(),
applicationsSource,
endpointRandomizer
);
// 如果需要将当前实例注册到eureka server, 执行该段逻辑
if (clientConfig.shouldRegisterWithEureka()) {
EurekaHttpClientFactory newRegistrationClientFactory = null;
EurekaHttpClient newRegistrationClient = null;
try {
// 创建regsitration client factory对象
newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory(
eurekaTransport.bootstrapResolver,
eurekaTransport.transportClientFactory,
transportConfig
);
// 获取registration Eureka http client对象
newRegistrationClient = newRegistrationClientFactory.newClient();
} catch (Exception e) {
logger.warn("Transport initialization failure", e);
}
// 将创建的对象保存到eureka transport中
eurekaTransport.registrationClientFactory = newRegistrationClientFactory;
eurekaTransport.registrationClient = newRegistrationClient;
}
// new method (resolve from primary servers for read)
// Configure new transport layer (candidate for injecting in the future)
// 判断是否需要从eureka 获取注册实例列表
if (clientConfig.shouldFetchRegistry()) {
EurekaHttpClientFactory newQueryClientFactory = null;
EurekaHttpClient newQueryClient = null;
try {
// 创建query http client工厂类
newQueryClientFactory = EurekaHttpClients.queryClientFactory(
eurekaTransport.bootstrapResolver,
eurekaTransport.transportClientFactory,
clientConfig,
transportConfig,
applicationInfoManager.getInfo(),
applicationsSource,
endpointRandomizer
);
// 创建query http client对象
newQueryClient = newQueryClientFactory.newClient();
} catch (Exception e) {
logger.warn("Transport initialization failure", e);
}
// 将数据保存到transport中
eurekaTransport.queryClientFactory = newQueryClientFactory;
eurekaTransport.queryClient = newQueryClient;
}
}
这个方法主要是填充EurekaTransport对象,通过EurekaClientFacotry对象创建EurekaHttpClient对象,在EurekaClient中,将注册和创建的客户端分开保存,便于后面使用.
fetchRegistry()
该方法主要从eureka server中fetch全量的注册信息, 具体源码如下:
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
// 开启计时器
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
// 获取所有的applications, 第一次启动时,该值为空
Applications applications = getApplications();
// 该处有几个判断条件:
// 1. 判断是否禁用了delta,根据注释, 开启将会降低客户端fetch数据的频率,默认值为false. 可以根据eureka.client.disable-delta配置
// 2. 判断是否根据VIP地址刷新, 默认值为NULL
// 3. 是否强制执行全量同步, 在应用第一次启动时,为false
// 4. applications对象是否为null, 在DiscoveryClient初始化的时候,就已经默认包含了Applications对象
// 5. applications中已经注册的applicaions的数量是否为0, 在初次启动的时候,该值为0, 因此该条件为true
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
// 获取并存储全量的注册信息
getAndStoreFullRegistry();
} else {
// 获取并更新delta列表
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
// 发送缓存更新的Event事件,该事件主要是在netflix内部进行处理, 通过EurekaEventListener处理,而该listener的配置,则是放在args中进行配置的
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
// 尝试更新当前instance在EurekaServer上的状态
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
getAndStoreFullRegistry()
该方法主要是从eureka server上同步全量的注册信息, 具体代码如下:
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
// 判断当前EurekaClient是否通过VIP方式获取远程列表, 如果不是,则通过getApplications()方法获取远程注册应用列表
// 如果采用VIP的方式, 则通过getVIP()方法获取远程应用列表
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
// 如果通过http访问返回的结果为200, 则获取applications对象信息
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
// 通过cas的方式更新当前fetch次数,如果更新成功,则将获取到的applications存入本地缓存中
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
onCachedRefreshed()
当从Eureka Server中同步完成Applicaions列表之后, 则发送CacheRefreshedEvent事件, 让EurekaEventListener事件能够有机会执行, 具体代码如下:
protected void onCacheRefreshed() {
fireEvent(new CacheRefreshedEvent());
}
/**
* Send the given event on the EventBus if one is available
*
* @param event the event to send on the eventBus
*/
protected void fireEvent(final EurekaEvent event) {
for (EurekaEventListener listener : eventListeners) {
try {
listener.onEvent(event);
} catch (Exception e) {
logger.info("Event {} throw an exception for listener {}", event, listener, e.getMessage());
}
}
}
这里需要注意的是,这里的事件并不是spring cloud内部的事件通知机制,而是由netflix提供的事件机制,因此,这里不能混淆。
updateInstanceRemoteStatus()
更新当前实例的远程实例状态信息, 具体代码如下:
private synchronized void updateInstanceRemoteStatus() {
// Determine this instance's status for this app and set to UNKNOWN if not found
InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null;
if (instanceInfo.getAppName() != null) {
// 从本地缓存中获取当前appName对应的远程实例信息
Application app = getApplication(instanceInfo.getAppName());
if (app != null) {
// 判断当前实例是否包含了当前启动的instance信息
InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId());
if (remoteInstanceInfo != null) {
// 如果包含, 则将当前当前远程实例状态与远程实例的状态保持一致
currentRemoteInstanceStatus = remoteInstanceInfo.getStatus();
}
}
}
// 如果远程实例不存在,则将当前远实例状态重置为UNKOWN
if (currentRemoteInstanceStatus == null) {
currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
}
// Notify if status changed
// 因为实例在启动过程中,状态为UNKOWN,如果在启动过程中,实例的状态发生了改变, 则需要发送通知,告知状态变更
if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) {
// 内部发送StatusChangeEvent状态通知消息
onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus);
// 更新当前内部的远程实例状态
lastRemoteInstanceStatus = currentRemoteInstanceStatus;
}
}
register()
该方法作为Client端启动中可选择的一环,实际上是将当前实例注册到Eureka Server中,这是该方法的调用默认为关闭状态, 该方法在实例化中被调用需要满足两个条件:
eureka.client.register-with-eureka配置值为true, 该配置默认开启eureka.client.should-enforce-registration-at-init配置值为true, 该配置默认为false
其实在spring-cloud中,默认是不建议在实例化过程中注册实例,我觉得最主要的原因为在实例化过程中,容器并没有完全准备好,还无法对外提供服务。此时注册上去,可能会导致依赖服务调用失败的情况发生。
具体代码如下:
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
// 通过EurekaHttpClient对象,将instance信息注册到EurekaServer服务中
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
initScheduledTasks()
在之前的流程中,介绍到了初始化了三个重要的线程池,用于执行定时任务,该方法则主要是用于初始化所有的定时任务,具体执行代码如下:
private void initScheduledTasks() {
// 当开启fetch之后, 则需要创建定时fetch的定时任务, 通过 eureka.client.fetch.registry 来开启
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
// 该配置主要配置了缓存的更新间隔时间, 通过eureka.client.regsitry-fetch-interval-seconds来配置,默认值为30秒
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
// 该配置用于设置在fetch操作发生失败后, 最大的重试时间, 该值通过eureka.client.cache-refresh-executor-exponential-back-off-bound配置,默认时间为10秒
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
// 创建TimedSupervisorTask任务,并提交到线程池调用,默认每隔30秒执行一次
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// 判断是否开启注册instance, 通过eureka.client.register-with-eureka, 默认值为true
if (clientConfig.shouldRegisterWithEureka()) {
// 从instance中获取续期时间间隔, 默认续期时间为 30秒
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
// 获取当续期失败后,重试的最大时间间隔,默认值为10秒, 可以通过eureka.client.heartbeat-executor-exponential-back-off-bound配置
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
// 开启心跳任务,默认每隔30秒执行一次
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
// 该类主要为创建单个线程,同步当前实例的状态到eureka server服务端
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
// 创建 EventListener对象,用于处理instance状态变换时间信息
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
// 当前listener用于处理instance状态变更的时间通知, 前面主要是做状态的输出, 最主要的还是调用replicator的onDemandUpdate方法
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
// 该配置用于判断是否开启本地状态的更新,通过ApplicationInfoManager触发register/update操作
// 该配置默认处于开启状态,即默认值为true; 可以通过eureka.client.on-demand-update-status-change配置更改
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
// 当开始配置后,statusChangeListener对象将会被注册到ApplicationInfoManager中, 用于处理StatusChangeEvent 时间
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 开启regsiter与update同步任务, 保证eureka 服务端的任务为最新状态
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
InstanceInfoReplicator
在初始化过程中,可以看到,注册instance信息以及更新信息都是通过InstanceInfoReplicator完成的,构造代码如下:
InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
this.discoveryClient = discoveryClient;
this.instanceInfo = instanceInfo;
// 创建线程池, 核心线程为1
this.scheduler = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d")
.setDaemon(true)
.build());
// 保存任务Future
this.scheduledPeriodicRef = new AtomicReference<Future>();
// 当前任务状态
this.started = new AtomicBoolean(false);
// 限流的实现
this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);
// 同步间隔时间, 默认为30秒
this.replicationIntervalSeconds = replicationIntervalSeconds;
this.burstSize = burstSize;
// 计算每分钟能够同步多少次, 默认burstSize = 2, 则每分钟能够同步4次
this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute);
}
在DiscoveryClient同步的过程中, 该任务的任务则是通过start()方法启动,对应源码为:
// 开始方法会传入一个参数, 用于判断延迟多久执行当前任务, 该延迟时间,我任务应当和容器的启动时间保持一致, 避免容器处于不可用状态
// 该值通过eurek.client.initial-instance-info-replication-interval-seconds配置,默认值为40秒
public void start(int initialDelayMs) {
// 开始方法只会被执行一次, 通过cas方法将当前任务状态设置为started状态
if (started.compareAndSet(false, true)) {
// 将当前instance状态设置为dirty状态, 比记录dirty时间值
instanceInfo.setIsDirty(); // for initial register
// 调用执行任务, 延迟40秒执行
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
// 绑定当前执行任务
scheduledPeriodicRef.set(next);
}
}
通过以上方法可以得知, 当前类型也作为任务本身执行,因此查看run方法, 具体源码如下:
public void run() {
try {
// 刷新当前instance info的信息, 该方法主要判断instance info的信息是否发生变化, 如果发生变化, 则instance info将会标记为dirty状态
discoveryClient.refreshInstanceInfo();
// 判断当前的instance是否被标记为dirty状态, 如果为dirty, 则返回dirty的时间
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
// 当instance被标记为dirty之后, 则重新将当前的instance注册到eureka server服务中
discoveryClient.register();
// 取消instance的dirty状态
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
// 重新执行当前任务,默认每隔30秒钟,执行,并绑定当前执行任务future对象
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
当我们在上面代码分析中得知, 当instance的状态发生变更的时候,会发送一个StatusChangeEvent的事件信息, 当在处理事件时,实际上是调用了onDemanUpdate()方法,具体代码如下:
public boolean onDemandUpdate() {
// 该主要是为了限流,防止过多的请求
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
// 判断线程池是否处于shutdown状态
if (!scheduler.isShutdown()) {
// 提交一个任务执行
scheduler.submit(new Runnable() {
@Override
public void run() {
logger.debug("Executing on-demand update of local InstanceInfo");
// 获取当前绑定的任务Future对象
Future latestPeriodic = scheduledPeriodicRef.get();
// 如果对应的future没有执行完成时,取消当前任务. 注意当前任务不会打断线程执行
if (latestPeriodic != null && !latestPeriodic.isDone()) {
logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
latestPeriodic.cancel(false);
}
// 重新执行run方法,开启新的任务
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
logger.warn("Ignoring onDemand update due to stopped scheduler");
return false;
}
} else {
logger.warn("Ignoring onDemand update due to rate limiter");
return false;
}
}
EurekaAutoServiceRegistration
在启动过程中比较重要的一个类就是EurekaAutoServiceRegistration类,该类主要维护了InstanceInfo在启动过程中的状态变更, 以及能够及时将状态变更同步到EurekaServer, 类型依赖关系如下:
在DiscoveryClient启动过程中,伴随着InstanceInfo状态的变化,状态变更流程如下:
EurekaAutoServiceRegistration完成的就是STARTING -> UP这一步的状态流转, 通过以上类图可以看出, EurekaAutoServiceRegistration 实现了SmartLifecyle类,因此在Spring容器化启动过程中,start()方法将会被回调, 因此我们查看该类的start()方法源码:
@Override
public void start() {
// only set the port if the nonSecurePort or securePort is 0 and this.port != 0
// 在初始化过程中,该端口默认值为0, 如果不为0, 则将非安全端口或者安全端口进行绑定
if (this.port.get() != 0) {
if (this.registration.getNonSecurePort() == 0) {
this.registration.setNonSecurePort(this.port.get());
}
if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
this.registration.setSecurePort(this.port.get());
}
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
// 当前registration状态没有处于running状态,并且nonSecurePort端口>0时, 条件才满足
if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
// 调用ServiceRegistry register方法,闯入当前的EurekaRegistration对象
this.serviceRegistry.register(this.registration);
// 发送InstanceRegisteredEvent时间
this.context.publishEvent(new InstanceRegisteredEvent<>(this,
this.registration.getInstanceConfig()));
// 将当前的状态标记为running状态
this.running.set(true);
}
}
ServiceRegistry
当前ServiceRegistry类主要负责InstanceInfo的状态变更,以及eureka http client对象的初始化,并注册HealthCheckerHandler对象. 具体源码如下:
public void register(EurekaRegistration reg) {
// 该方法用于初始化EurekaHttpClient对象
maybeInitializeClient(reg);
if (log.isInfoEnabled()) {
log.info("Registering application "
+ reg.getApplicationInfoManager().getInfo().getAppName()
+ " with eureka with status "
+ reg.getInstanceConfig().getInitialStatus());
}
// 更新ApplicationInfoManager中instance状态为初始状态
// 这个地方主要依赖了CloudEurekaInstanceConfig对象,在初始化状态InitialStatus状态为UP状态
reg.getApplicationInfoManager()
.setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
// 注册HealthCheckHandler对象,处理与HealthCheck有关的信息
reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg
.getEurekaClient().registerHealthCheck(healthCheckHandler));
}
在register方法中,会直接更新ApplicationInfoManager中的InstanceStatus, 更新instance status之后,会触发StatusChangeEvent时间,这里就与DiscoveryClient中初始化的InstanceInfoReplicator相对应。能够及时处理instance 状态变更。
至此,整个Eureka Client实例化流程整体梳理完毕,后面将给出比较完整的流程图.
总结
在Eurek Client初始化过程中,会初始化相关很多相关信息,spring cloud在这基础上做了很多的扩展,包括了服务状态、状态变更、心跳机制、实例同步任务等相关。流程还是比较长,需要比较长的时间进行梳理。以下给出了主要Discovery Client相关的主要流程图,希望可以帮到大家:


