spring · 21 8 月, 2021 0

spring cloud 服务注册之Eureka Server(五) – 服务剔除策略

Eureka Server本身在做服务注册时,客户端是否下线,是通过客户端向服务端发送心跳信息, 服务端一次来保证客户端处于UP状态。同时当客户端心跳发送失败时,这是服务端将通过自身的剔除策略,将处于DOWN状态下的服务从Registry中进行移除,保证服务可用性。

策略如何启用?

前面章节中,服务的启动过程中详细阐述了EurekaServer的启用过程, 其中有介绍到, 当EurekaServerBootstrap在执行initilize方法时,会启动evict定时任务,执行服务剔除操作,具体代码如下:

protected void postInit() {
        renewsLastMin.start();
        if (evictionTaskRef.get() != null) {
            evictionTaskRef.get().cancel();
        }
        evictionTaskRef.set(new EvictionTask());
        evictionTimer.schedule(evictionTaskRef.get(),
                serverConfig.getEvictionIntervalTimerInMs(),
                serverConfig.getEvictionIntervalTimerInMs());
    }

evict任务启动,则是通过postInit方法执行,方法很简单, 主要包含三个步骤:

  • 开启renewLasmin任务
  • 判断当前是否已经存在evict任务,存在则取消
  • 开启evict定时任务,每60秒执行一次(eureka.server.eviction.interval-timer-ms = 60)

任务做了什么?

eureka evict 策略

以上流程图我觉得是比较清晰的表达了整个evict方法执行的详情过程, 对于Eureka来说,有一下需要注意的点:

  • 执行Evict的定时任务为60秒执行一次
  • Eureka Server在计算Instance是否过期的时候,并不是采用一刀切的方式,而是通过当前时间与上一次执行的时间算出差值,动态的匹配那些instance过期
  • 当过期的instance数量 > instance总数 * renewsThreshold 的值时,并不会一次性将所有的instance的剔除,而是采用了取最小值的剔除策略.

源码解读

其他的细节我们就不用看了,直接看evict方法是如何将instance做剔除操作的:

public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");

        // 判断当前instance registry是否支持将Lease过期, 如果为false, 则结束执行,数据不会从map中剔除
        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }

        // We collect first all expired items, to evict them in random order. For large eviction sets,
        // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
        // the impact should be evenly distributed across all applications.
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();

                    // 判断当前lease是否过期的依据,主要是根据三个值计算: System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs)
                    // additianalLeaseMs则是根据当前时间 - 上一次执行evict的时间,的到的一个差值
                    // lastUpdateTimestamp主要是值当前Lease客户端心跳的时间
                    // duration主要为心跳的间隔时间
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }

        // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
        // triggering self-preservation. Without that we would wipe out full registry.
        int registrySize = (int) getLocalRegistrySize();
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        int evictionLimit = registrySize - registrySizeThreshold;

        // 这里也是执行evict相关的关键点所在, 可能过期的expiredLease会大于evictionLimit信息,这时并不会将所有Lease全部过期, 而是取最小值
        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);

                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                // 真正执行evict策略的方法
                internalCancel(appName, id, false);
            }
        }
    }

前面也讲过,我们Registry对象是spring自己的实现InstanceRegistry对象,因此,在这个扩展的过程中,提供了不一样的点,就是EurekaInstanceCanceledEvent事件的发送:

private void handleCancelation(String appName, String id, boolean isReplication) {
        log("cancel " + appName + ", serverId " + id + ", isReplication "
                + isReplication);
        publishEvent(new EurekaInstanceCanceledEvent(this, appName, id, isReplication));
    }

 

真正的执行剔除的方法的,还是在AbstractInstanceRegistry中进行执行, 源码如下:

protected boolean internalCancel(String appName, String id, boolean isReplication) {
        try { // 获取读锁 
            read.lock();
            CANCEL.increment(isReplication);
            // 获取appName在registry中注册的Lease信息 
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> leaseToCancel = null;
            if (gMap != null) {
                // 如果实例存在,则删除当前过期的实例 
                leaseToCancel = gMap.remove(id);
            }
            synchronized (recentCanceledQueue) {
                // 将伤处的Instance信息存入到queue中 
                recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
            }
            // 移除当前实例的overridenInstanceStatus信息 
            InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
            if (instanceStatus != null) {
                logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
            }
            // 如果当前实例的信息不存在时,直接返回 
            if (leaseToCancel == null) {
                CANCEL_NOT_FOUND.increment(isReplication);
                logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
                return false;
            } else {
                leaseToCancel.cancel();
                InstanceInfo instanceInfo = leaseToCancel.getHolder();
                String vip = null;
                String svip = null;
                // 如果实例信息存在,则将当前实例信息标记为删除 
                if (instanceInfo != null) {
                    instanceInfo.setActionType(ActionType.DELETED);
                    recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                    instanceInfo.setLastUpdatedTimestamp();
                    vip = instanceInfo.getVIPAddress();
                    svip = instanceInfo.getSecureVipAddress();
                }
                // 过期responseCache中的readWriteCacheMap中的实例信息 
                invalidateCache(appName, vip, svip);
                logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
                return true;
            }
        } finally {
            read.unlock();
        }
    }

至此,关于Eureka Server中关键的流程信息已经介绍完毕,如果大家对文章有好的意见或者疑问,都可以在评论区留言。