spring · 15 8 月, 2021 0

spring cloud 服务注册之Eureka Server(二) – 启动过程

在Eureka Server自动装配文章中,介绍了Eureka相关的启动组件,这篇文章主要介绍在启动过程中,各组件都是如何工作的。

InstanceRegistry

从类型命名可以知道,该类型主要用于Instance信息注册实现,用于保存Eureka Client注册上来的基本信息。我们可以查看下InstanceRegistry的实现结构.

InstanceRegistry中实现了PeerAwareInstanceRegistryImpl的实现,创建实例的源码如下:

public InstanceRegistry(EurekaServerConfig serverConfig,
            EurekaClientConfig clientConfig, ServerCodecs serverCodecs,
            EurekaClient eurekaClient, int expectedNumberOfClientsSendingRenews,
            int defaultOpenForTrafficCount) {
        super(serverConfig, clientConfig, serverCodecs, eurekaClient);

        // eureka.server.expectedNumberOfRenewsPerMin 配置值, 默认为1
        this.expectedNumberOfClientsSendingRenews = expectedNumberOfClientsSendingRenews;

        // eureka.server.defaultOpenForTrafficCount 默认值为1
        this.defaultOpenForTrafficCount = defaultOpenForTrafficCount;
    }

创建InstanceRegsitry实例时,同事也会初始化PeerAwareInstanceRegistryImpl对象, 具体代码如下:

@Inject
    public PeerAwareInstanceRegistryImpl(
            EurekaServerConfig serverConfig,
            EurekaClientConfig clientConfig,
            ServerCodecs serverCodecs,
            EurekaClient eurekaClient
    ) {
        super(serverConfig, clientConfig, serverCodecs);
        
        // eureka client独享
        this.eurekaClient = eurekaClient;
        
        // 同步策略 
        this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1);
        // We first check if the instance is STARTING or DOWN, then we check explicit overrides,
        // then we check the status of a potentially existing lease.
        this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new DownOrStartingRule(),
                new OverrideExistsRule(overriddenInstanceStatusMap), new LeaseExistsRule());
    }

对于InstanceRegistry而言,没有其他复杂的操作,只是将对象创建出来,并交由Spring进行托管。

RefreshableEureakPeerNodes

PeerNodes主要用于维护客户端连接Eureka Server连接url列表, 该类有一个start()方法,用于开启定时任务定时刷新, 具体代码如下:

public void start() {
        taskExecutor = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                        thread.setDaemon(true);
                        return thread;
                    }
                }
        );
        try {
            
            // 更新PeerEurekaNodes节点信息
            updatePeerEurekaNodes(resolvePeerUrls());
            
            // 创建update PeerEurekaNodes任务
            Runnable peersUpdateTask = new Runnable() {
                @Override
                public void run() {
                    try {
                        updatePeerEurekaNodes(resolvePeerUrls());
                    } catch (Throwable e) {
                        logger.error("Cannot update the replica Nodes", e);
                    }

                }
            };
       
            // task任务通过定时任务执行, 可以通过 eureka.server.peer-eureka-nodes-update-interval-ms 配置, 默认为10分钟
            taskExecutor.scheduleWithFixedDelay(
                    peersUpdateTask,
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    TimeUnit.MILLISECONDS
            );
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        for (PeerEurekaNode node : peerEurekaNodes) {
            logger.info("Replica node URL:  {}", node.getServiceUrl());
        }
    }

我们可以看一下,resolvePeerUrls()方法返回了EurekaServerUrl服务列表, 具体代码如下:

protected List<String> resolvePeerUrls() {
        InstanceInfo myInfo = applicationInfoManager.getInfo();
        String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);

        // 从当前ClientConfig中获取链接的server-url列表, 该列表会有region分区的限制, 如果分区数据不存在,则采用defautZone配置列表
        List<String> replicaUrls = EndpointUtils
                .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));

        int idx = 0;
        while (idx < replicaUrls.size()) {
            // 判断是否和当前服务器url地址一样,如果一样,则从service-url列表中剔除
            if (isThisMyUrl(replicaUrls.get(idx))) {
                replicaUrls.remove(idx);
            } else {
                idx++;
            }
        }
        return replicaUrls;
    }

updatePeerEurekaNodes代码其实很简单,通过不断从ServerConfig中读取server-url配置,然后和本地缓存做比对,并更新本地缓存信息。

protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
       if (newPeerUrls.isEmpty()) {
           logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
           return;
       }

       // 将peerEurekaNodeUrls中的配置信息做缓存,外部变化不会影响到代码执行
       Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
       toShutdown.removeAll(newPeerUrls);
       Set<String> toAdd = new HashSet<>(newPeerUrls);
       toAdd.removeAll(peerEurekaNodeUrls);

       if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
           return;
       }

       // Remove peers no long available
       List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);

       if (!toShutdown.isEmpty()) {
           logger.info("Removing no longer available peer nodes {}", toShutdown);
           int i = 0;
           while (i < newNodeList.size()) {
               PeerEurekaNode eurekaNode = newNodeList.get(i);
               if (toShutdown.contains(eurekaNode.getServiceUrl())) {
                   newNodeList.remove(i);
                   eurekaNode.shutDown();
               } else {
                   i++;
               }
           }
       }

       // Add new peers
       if (!toAdd.isEmpty()) {
           logger.info("Adding new peer nodes {}", toAdd);
           for (String peerUrl : toAdd) {
               newNodeList.add(createPeerEurekaNode(peerUrl));
           }
       }

       // 更新本地缓存
       this.peerEurekaNodes = newNodeList;
       this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
   }

EurekaServerContext

该类是作为EurekaServer实现中,比较重要的一环,用于对EurekaServer上下文内容执行初始化操作. 具体代码如下:

@Singleton
public class DefaultEurekaServerContext implements EurekaServerContext {
    private static final Logger logger = LoggerFactory.getLogger(DefaultEurekaServerContext.class);

    private final EurekaServerConfig serverConfig;
    private final ServerCodecs serverCodecs;
    private final PeerAwareInstanceRegistry registry;
    private final PeerEurekaNodes peerEurekaNodes;
    private final ApplicationInfoManager applicationInfoManager;

    @PostConstruct
    @Override
    public void initialize() {
        logger.info("Initializing ...");
       
        // 调用PeerEurekaNodes方法,并开启定时任务更新PeerEurekaNodes节点信息
        peerEurekaNodes.start();
        try {
            // InstanceRegistry初始化数据操作
            registry.init(peerEurekaNodes);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        logger.info("Initialized");
    }

    @PreDestroy
    @Override
    public void shutdown() {
        logger.info("Shutting down ...");
        registry.shutdown();
        peerEurekaNodes.shutdown();
        logger.info("Shut down");
    }
...
}

EurekaServerContext类型是在Bean的初始化阶段执行了initialize()方法, 该方法主要做两个事情

  • 开启定时任务,每10分钟更新server-url配置的节点信息
  • 初始化InstanceRegistry中的实例数据

InstanceRegistry.init()初始化方法源码如下:

@Override
    public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {

        // 开启定时任务, 定时清理上1分钟同步的buckets数量, 任务每1分钟执行一次
        this.numberOfReplicationsLastMin.start();
        this.peerEurekaNodes = peerEurekaNodes;

        // 初始化ResponseCache对象, 也是Eureka Server三级缓存的实现代码
        initializedResponseCache();

        // 开启定时任务, 用于更新续期阈值, 该值主要用于,在发生分区的时候, 是否丢弃
        scheduleRenewalThresholdUpdateTask();
        initRemoteRegionRegistry();

        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
        }
    }

private void scheduleRenewalThresholdUpdateTask() {
        // 根据eureka.server.renewal-threshold-update-interval-ms设置,默认值为15分钟
        timer.schedule(new TimerTask() {
                           @Override
                           public void run() {
                               updateRenewalThreshold();
                           }
                       }, serverConfig.getRenewalThresholdUpdateIntervalMs(),
                serverConfig.getRenewalThresholdUpdateIntervalMs());
    }

private void updateRenewalThreshold() {
        try {
            Applications apps = eurekaClient.getApplications();
            
            // 当前接收到的实例数量
            int count = 0;
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    if (this.isRegisterable(instance)) {
                        ++count;
                    }
                }
            }
            synchronized (lock) {
                // Update threshold only if the threshold is greater than the
                // current expected threshold or if self preservation is disabled.
                
                // 该处更新阈值信息时, 
               // 1. 接收到的实例数量 > 百分比的阈值 * 当前期望阈值数量
               // 2. 禁用selfPrervationModeEnabled,即 eureka.server.should-enable-self-preservation= false, 默认值true
               // 默认eureka.renewal-percent-threshold=0.85
               // 如果我们将自我保护打开,在发生网络分区的时候,当instance的数量达不到历史期望值的85%的时候, 将不会在更新阈值, eureka-server将启用自我保护状态
                if ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews)
                        || (!this.isSelfPreservationModeEnabled())) {
                    this.expectedNumberOfClientsSendingRenews = count;
                    updateRenewsPerMinThreshold();
                }
            }
            logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
        } catch (Throwable e) {
            logger.error("Cannot update renewal threshold", e);
        }
    }

EurekaServerBootstrap

该类主要负责对EurekaServer环境配置, 以及远程instance信息从同级eureka server同步,保证所有Eureka Server中的数据一致性。在该类中主要包含两个主要方法contextInitializedcontextDestroyed两个方法,具体源码如下:

public void contextInitialized(ServletContext context) {
        try {
            // 初始化eureka环境信息, 确认数据中心地址, 以及当前eureka server所处的环境, 默认为test环境
            initEurekaEnvironment();
                        
            // 初始化eureka server context
            initEurekaServerContext();

            context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
        }
        catch (Throwable e) {
            log.error("Cannot bootstrap eureka server :", e);
            throw new RuntimeException("Cannot bootstrap eureka server :", e);
        }
    }

    public void contextDestroyed(ServletContext context) {
        try {
            log.info("Shutting down Eureka Server..");
            context.removeAttribute(EurekaServerContext.class.getName());

            destroyEurekaServerContext();
            destroyEurekaEnvironment();

        }
        catch (Throwable e) {
            log.error("Error shutting down eureka", e);
        }
        log.info("Eureka Service is now shutdown...");
    }

在初始化initEurekaServerContext()方法中, 则是主要同步instance数据和开启evict定时任务,用于剔除已下线的任务:

protected void initEurekaServerContext() throws Exception {
        // For backward compatibility
        JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
                XStream.PRIORITY_VERY_HIGH);
        XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
                XStream.PRIORITY_VERY_HIGH);

        if (isAws(this.applicationInfoManager.getInfo())) {
            this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
                    this.eurekaClientConfig, this.registry, this.applicationInfoManager);
            this.awsBinder.start();
        }

        EurekaServerContextHolder.initialize(this.serverContext);

        log.info("Initialized server context");

        // Copy registry from neighboring eureka node
        int registryCount = this.registry.syncUp();
                // 开启evict定时任务
        this.registry.openForTraffic(this.applicationInfoManager, registryCount);

        // Register all monitoring statistics.
        EurekaMonitors.registerAllStats();
    }

InstanceRegistry中,包含了syncUp()方法,源码如下:

@Override
    public int syncUp() {
        // Copy entire entry from neighboring DS node
        int count = 0;

        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
                try {
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
             
             // 通过eureka client 获取其他eureka server节点上instance列表
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                        if (isRegisterable(instance)) {
                            // 将instance信息保存到当前Regitry中
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                           // 当前同步的instance数量
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }

openForTraffic()方法,则是开启定时任务, 具体代码如下:

@Override
    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
        this.expectedNumberOfClientsSendingRenews = count;
        
        // 更新renews阈值
        updateRenewsPerMinThreshold();
        logger.info("Got {} instances from neighboring DS node", count);
        logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
        this.startupTime = System.currentTimeMillis();
        if (count > 0) {
            this.peerInstancesTransferEmptyOnStartup = false;
        }
        DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
        boolean isAws = Name.Amazon == selfName;
        // 这个aws服务器,可以忽略
        if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
            logger.info("Priming AWS connections for all replicas..");
            primeAwsReplicas(applicationInfoManager);
        }

        logger.info("Changing status to UP");
        // 将当前instance的状态设置为UP状态
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
        
        // 后置初始化操作
        super.postInit();
    }

protected void postInit() {
        renewsLastMin.start();
        if (evictionTaskRef.get() != null) {
            evictionTaskRef.get().cancel();
        }
        evictionTaskRef.set(new EvictionTask());
      
         // 每60秒定制执行evict操作, 可以通过eureka.server.eviction-interval-time-in-ms=60来更改当前的时间
        evictionTimer.schedule(evictionTaskRef.get(),
                serverConfig.getEvictionIntervalTimerInMs(),
                serverConfig.getEvictionIntervalTimerInMs());
    }

以上则是eureka主要的启动过程,这之间设计到阈值定时任务,evict定时任务, PeerNodes更新任务等,下一章节将主要介绍eureka的服务注册与Evict策略