spring · 16 8 月, 2021 0

spring cloud 服务注册之Eureka Server(三) – 服务注册

在前面的文章中,介绍了Eureka Server的启动流程,以及启动过程中执行的操作信息。本章将会介绍服务注册相关的实现。服务注册中主要介绍Instance信息通过register的方式保存到Registry的业务逻辑,以及如何实现数据同步。

数据初始化

spring cloud 服务注册之Eureka Server(二) – 启动过程章节中,介绍到Eureka Server启动过程中, 会从其他的Peer Nodes上同步Applications列表,并保存到当前的服务Registry服务中, 具体代码如下:

@Override
    public int syncUp() {
        // Copy entire entry from neighboring DS node
        int count = 0;

        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
                try {
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
            // 通过Eureka Client通过applications列表
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                        if (isRegisterable(instance)) {
                            // 将Instance信息存放到当前Registry中
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }

将Instance信息保存到当前Registry实例中,是通过register方法保存instance信息, 具体实现代码如下:

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            // 获取读锁
            read.lock();

            // 根据注册的instance 中appName获取存储的InstanceInfo列表
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
  
            // 计数
            REGISTER.increment(isReplication);

            // 当gMap为空时,表示了当前appName没有在Registry中注册, 因此构建一个空的Map放入到registry中
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }

            // 根据instanceId获取gMap中存储的实例列表
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());

            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            // 该条件判断了是否存在了instance信息, 如果存在了,则条件成立
            if (existingLease != null && (existingLease.getHolder() != null)) {
                // 获取最后一次instance被请求的时间戳
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();

                // 当前注册instance最后一次被请求的时间戳
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
                // InstanceInfo instead of the server local copy.
               // 当前判断逻辑为, 如果本地的instance的时间戳大于远程的instance的时间戳,则还是以本地的实例信息为准.说明本地的instance信息是较新的
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }
            } else {
                // 当前逻辑为: 在本地registry中不存在instance信息, 获取排它锁
                // The lease does not exist and hence it is a new registration
                synchronized (lock) {
                    if (this.expectedNumberOfClientsSendingRenews > 0) {
                        // Since the client wants to register it, increase the number of clients sending renews
                        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                        // 更新本地的renews阈值
                        updateRenewsPerMinThreshold();
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }

            // 通过Lease保存当前注册的instance信息
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }

            // 将实例信息保存在gMap中, key -> instance.getId, value -> Lease
            gMap.put(registrant.getId(), lease);

            // 当前recentRegisteredQueue, 根据注释,表示当前的queue只是用调试, 具体没有发现其他用处
           //  // CircularQueues here for debugging/statistics purposes only
            synchronized (recentRegisteredQueue) {
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }

            // This is where the initial state transfer of overridden status happens
           // 当前if的逻辑主要是, 判断instance的当前状态是否为UNKOWN, 如果不是UNKOWN, 并且在overriddenInstanceStatusMap不
          // 包含instanceId的状态数据, 则将当前instance的状态加入到overriddenInstanceStatusMap中
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }

            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

            // Set the status based on the overridden status rules
           // 当前将根据status rules确定当前实例的状态
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            // If the lease is registered with UP status, set lease service up timestamp
            // 当前instance的状态称为UP的时候,将Lease的状态更改为UP状态
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }

            // 将当前instance的操作类型设置为ADDED
            registrant.setActionType(ActionType.ADDED);

            // 最近变更Lease列表
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));

            // 实例最后更新的时间
            registrant.setLastUpdatedTimestamp();

            // 将responseCase中readWriteCacheMap中对应的appName的值设置为过期
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
           // 释放读锁
            read.unlock();
        }
    }

通过以上源码分析,可以看到, 在registry中register实例信息的时候,流程图如下:

eureka spring cloud

数据复制

当有客户端注册instance上来时,如何让其他的eureka server能够获取到当前的注册实例信息呢,源码如下:

@Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        super.register(info, leaseDuration, isReplication);
        // 将当前实例信息注册到其他Peer Nodes节点
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }


通过查看源码可知,在注册完成instance实例信息之后,通过调用replicateToPeers方法将当前实例信息同步到其他的节点, 方法代码如下:

private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                // 将instance信息注册到其他的PeerNodes节点
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }

private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel: // 取消
                    node.cancel(appName, id);
                    break;
                case Heartbeat: // 心跳
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:  // 注册
                    node.register(info);
                    break;
                case StatusUpdate: // 状态更新
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride: // 删除状态
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }

通过源码可以得知,通过instance信息,实际上是调用PeerNode的register方法,实现同步的方式,PeerNode的源码如下:

public void register(final InstanceInfo info) throws Exception {
        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
        batchingDispatcher.process(
                taskId("register", info),
                new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                    public EurekaHttpResponse<Void> execute() {
                        return replicationClient.register(info);
                    }
                },
                expiryTime
        );
    }

在注册的时候,通过replicationClient对象,调用register方法注册实例信息, 具体代码如下:

public EurekaHttpResponse<Void> register(InstanceInfo info) {
        String urlPath = "apps/" + info.getAppName();
        ClientResponse response = null;
        try {
            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
            addExtraHeaders(resourceBuilder);
            response = resourceBuilder
                    .header("Accept-Encoding", "gzip")
                    .type(MediaType.APPLICATION_JSON_TYPE)
                    .accept(MediaType.APPLICATION_JSON)
                    .post(ClientResponse.class, info);
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                        response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }

总结

通过以上分析可以得知,Eureka Server 之间数据同步主要有两种方式:

  • 当Eureka Server启动时,主动从其他的Peer Nodes上同步实例列表
  • 当Eureka Server接收到客户端服务注册时,主动调用Peer Node将服务注册到其他服务