spring · 3 10 月, 2021 0

spring cloud 之负载均衡Ribbon(四)—RestTemplate

spring cloud负载均衡之ribbon—demo实现 我们介绍了Ribbon的使用方式,其中给出了完整的demo使用方式,这篇文章将介绍RestTemplate类将介绍与Ribbon配合工作流程。如果对ribbon的工作原理感兴趣,可以参考spring clound负载均衡之Ribbon(三)- 工作原理 , 这篇文章介绍了Ribbon中重要的类型,以及类型的作用。

使用方式

在正式开始介绍RestTemplate方式工作方式之前,我们回顾下如何通过RestTemplate进行远程请求, 具体使用方式如下:

@GetMapping("/info")
    public String info() {
        return restTemplate.getForObject("http://SPRING-EUREKA-CLIENT-B/info", String.class);
    }

在这个简单使用中,我们主要使用了getForObject()方法,请求远程服务,其中有两个参数需要留意:

  • 请求URL: 请求URL中包含了对远程请求的服务名称
  • 远程服务返回结果,将封装成对一个的结果对象。本demo中则直接使用String字符串。

工作原理

本篇文章将以getForObject()作为主要入口,RestTemplate如何与Ribbon配合并很好的工作的。我们看下RestTemplate的类图结构:

RestTemplate

通过类图我们可以得知,RestTemplateRibbon的链接渠道是通过RibbonClientHttpRequestFactory进行链接,通过返回的ClientHttpRequest对象将Ribbon组件进行组装。下面我们以getForObject()方法为例,跟踪下具体的源码,查看请求是如何组装与最终执行的。

RestTemplate#getForObject()

该方法作为入口,接收请求信息,直接看源码:

    @Override
    @Nullable
    public <T> T getForObject(String url, Class<T> responseType, Object... uriVariables) throws RestClientException {
        // 创建RequstCallback对象,该对象主要对Request进行回调处理,默认为AcceptHeaderRequestCallback
        RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
        // 这里创建对response message的返回值进行处理, 通过对返回类型,以及MessageConverters进行绑定
        HttpMessageConverterExtractor<T> responseExtractor =
                new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger);
        // 调用execute方法
        return execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables);
    }

   public <T> RequestCallback acceptHeaderRequestCallback(Class<T> responseType) {
      return new AcceptHeaderRequestCallback(responseType);
   }

RestTemplate#execute()

@Override
    @Nullable
    public <T> T execute(String url, HttpMethod method, @Nullable RequestCallback requestCallback,
            @Nullable ResponseExtractor<T> responseExtractor, Object... uriVariables) throws RestClientException {

        // 这里是将url中的占位符信息,替换为variables中的具体的值, 组成完整的URI信息
        URI expanded = getUriTemplateHandler().expand(url, uriVariables);
        // 执行请求
        return doExecute(expanded, method, requestCallback, responseExtractor);
    }

RestTemplate#doExecute()

这里为正式执行http请求的方法,具体查看源码:

@Nullable
    protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
            @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {

        // 参数校验
        Assert.notNull(url, "URI is required");
        Assert.notNull(method, "HttpMethod is required");
        ClientHttpResponse response = null;
        try {
            // 创建ClientHttpRequest独享
            ClientHttpRequest request = createRequest(url, method);

            // 当RequestCallback不为空时,调用请求回调类, 这里默认为AcceptHeaderRequestCallback
            if (requestCallback != null) {
                requestCallback.doWithRequest(request);
            }
            // 执行request请求, 并获取response响应对象
            response = request.execute();
            // 处理response响应数据
            handleResponse(url, method, response);
            // 将请求结果转换为指定的responseType, 并返回
            return (responseExtractor != null ? responseExtractor.extractData(response) : null);
        }
        catch (IOException ex) {
            String resource = url.toString();
            String query = url.getRawQuery();
            resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);
            throw new ResourceAccessException("I/O error on " + method.name() +
                    " request for \"" + resource + "\": " + ex.getMessage(), ex);
        }
        finally {
            if (response != null) {
                response.close();
            }
        }
    }

doExecute()方法中,逻辑是很明确的,主要为创建Request -> request 回调 -> 发送请求 -> 处理响应结果 -> 类型转换这么一个过程

HttpAccessor#createRequest()

在上面类图中可以得知,其实创建Request对象是由父类来完成的,因此我们看下创建Request方法的源码:

protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
        // 这里创建Request对象是通过HttpClientRequestFactory类来实现的
        ClientHttpRequest request = getRequestFactory().createRequest(url, method);
        if (logger.isDebugEnabled()) {
            logger.debug("HTTP " + method.name() + " " + url);
        }
        return request;
    }

InterceptingHttpAccessor#getRequestFactory

在获取RequestFactory的时候,其实InterceptingHttpAccessor类中对HttpClientRequestFactory类型做了扩展,增加了ClientHttpRequestInterceptor的扩展,具体逻辑如下:

@Override
    public ClientHttpRequestFactory getRequestFactory() {
        // 获取已经注册的interceptors的类型
        List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
        // 如果interceptors不为空,则改变HttpAccessor中的现有逻辑
        if (!CollectionUtils.isEmpty(interceptors)) {
            ClientHttpRequestFactory factory = this.interceptingRequestFactory;
            if (factory == null) {
                // 创建InterceptingClientHttpRequestFactory对象
                factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
                this.interceptingRequestFactory = factory;
            }
            return factory;
        }
        else {
            return super.getRequestFactory();
        }
    }

}

该方法中可能会该变现有的逻辑,主要通过判断是否注册了interceptor,因此当有interceptor存在时,request请求必须先经过interceptor.intercept()方法之后,才会最终走到RibbonClientHttpRequestFactory类中,创建新的request对象发送请求。这里不做展开,感兴趣的可以自己查看。

RibbonClientHttpRequestFactory#createRequest()

最终HttpRequest的创建就在RibbonClientHttpRequestFactory中完成,我们看下源码:

public ClientHttpRequest createRequest(URI originalUri, HttpMethod httpMethod) throws IOException {
        // 获取uri中的host,因为我们发送请求的时候,使用的为serviceId, 因此这里直接就是serviceId信息
        String serviceId = originalUri.getHost();
        if (serviceId == null) {
            throw new IOException(
                    "Invalid hostname in the URI [" + originalUri.toASCIIString() + "]");
        }
     
        // 从SpringClientFactory中获取IClientConfig bean, 默认为DefaultClientConfigImpl中实现
        IClientConfig clientConfig = this.clientFactory.getClientConfig(serviceId);
        
        // 获取RestClient对象, 默认为OverrideRestClient对象
        RestClient client = this.clientFactory.getClient(serviceId, RestClient.class);
        HttpRequest.Verb verb = HttpRequest.Verb.valueOf(httpMethod.name());

        return new RibbonHttpRequest(originalUri, verb, client, clientConfig);
    }

RibbonHttpRequest#execute()

执行http请求, 查看下具体源码信息:

@Override
    protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
        try {
            // 向http中加入headers信息
            addHeaders(headers);
            // 当outputstream已经存在时,则关闭,并读取request中的请求信息
            if (outputStream != null) {
                outputStream.close();
                builder.entity(outputStream.toByteArray());
            }

            // 获取HttpRequest对象,对象中包含了URI与Verb信息
            HttpRequest request = builder.build();
            // 执行请求,与LoadBalancer一起执行
            HttpResponse response = client.executeWithLoadBalancer(request, config);
            return new RibbonHttpResponse(response);
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }

AbstractLoadBalancerAwareClient#executeWithLoadBalancer()

在通过RestClient调用时,最终都会通过executeWithLoadBalancer()方法进行预处理,具体源码如下:

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        // 获取LoadBalancerCommand对象
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

        try {
            // 提交并执行command
            return command.submit(
                new ServerOperation<T>() {
                    @Override
                    public Observable<T> call(Server server) {
                        // 根据server重新组装URI信息
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                            // 执行http请求
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        } 
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
        } catch (Exception e) {
            Throwable t = e.getCause();
            if (t instanceof ClientException) {
                throw (ClientException) t;
            } else {
                throw new ClientException(e);
            }
        }
        
    }

// 构建LoadBalancerCommand
protected LoadBalancerCommand<T> buildLoadBalancerCommand(final S request, final IClientConfig config) {
        // 设置retry的操作设置
        RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, config);
        // 创建LoadBalancerCommand
        LoadBalancerCommand.Builder<T> builder = LoadBalancerCommand.<T>builder()
                .withLoadBalancerContext(this) // 设置LoadBalancerContext
                .withRetryHandler(handler) // 设置重试
                .withLoadBalancerURI(request.getUri()); // 设置uri
        // 自定义操作
        customizeLoadBalancerCommandBuilder(request, config, builder);
        // 构建Command
        return builder.build();
    }

ZoneAwareLoadBalancer#chooseServer()

因为在Ribbon中还是采用的响应是编程,因此,构建Observable对象时,这里就不贴源码了,最终在Request执行时,则需要从ILoadBalancer中选择一个合适的Server,并返回,具体源码如下:

@Override
   public Server chooseServer(Object key) {
       // 判断是否启用ZoneAwareLoadBalancer, 或则当前可用的zone只有一个时,直接从当前zone选择server
       if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
           logger.debug("Zone aware logic disabled or there is only one zone");
           return super.chooseServer(key);
       }
       Server server = null;
       try {
           // 获取stats状态信息
           LoadBalancerStats lbStats = getLoadBalancerStats();
           // 创建缓存数据快照
           Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
           logger.debug("Zone snapshots: {}", zoneSnapshot);
           if (triggeringLoad == null) {
               triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                       "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
           }

           if (triggeringBlackoutPercentage == null) {
               triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                       "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
           }
            
           // 从快照中获取可用的快照列表
           Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
           logger.debug("Available zones: {}", availableZones);

           // 可用快照存在时
           if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
               // 随机选择一个可用的zone
               String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
               logger.debug("Zone chosen: {}", zone);
               if (zone != null) {
                   // 获取区域的LoadBalancer对象
                   BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                   // 从当前区域中选择Server
                   server = zoneLoadBalancer.chooseServer(key);
               }
           }
       } catch (Exception e) {
           logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
       }
       if (server != null) {
           return server;
       } else {
           // 当选择Server失败时,重新选择Server
           logger.debug("Zone avoidance logic is not invoked.");
           return super.chooseServer(key);
       }
   }

BaseLoadBalancer#chooseServer()

public Server chooseServer(Object key) {
        if (counter == null) {
            counter = createCounter();
        }
        counter.increment();
        if (rule == null) {
            return null;
        } else {
            try {
                // 当IRule存在时,从rule中选取Server信息,当前默认为: ZoneAvoidanceRule
                return rule.choose(key);
            } catch (Exception e) {
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                return null;
            }
        }
    }

PredicateBaseRule#choose()

public Server choose(Object key) {
        ILoadBalancer lb = getLoadBalancer();
        // 1. 从LoadBalancer中获取所有的Server列表
        // 2. 通过Predicate选择Server
        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {
            return server.get();
        } else {
            return null;
        }       
    }

AbstractServerPredicate#chooseRoundRobinAfterFiltering()

在该方法中,其实就是一个RoundRobin的算法,轮询的请求后端Server. 具体源码如下:

public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
        // 获取可以使用的Server列表
        List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
        if (eligible.size() == 0) {
            return Optional.absent();
        }
        // 根据RoundRobin算法计算对应的index, 并获取Server
        return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
    }

private int incrementAndGetModulo(int modulo) {
    for (;;) {
        int current = nextIndex.get();
        int next = (current + 1) % modulo;
        if (nextIndex.compareAndSet(current, next) && current < modulo)
            return current;
    }
}

RibbonClientConfiguration.OverrideRestClient#reconstructURIWithServer()

该方法内部主要是为了将请求地址更新到安全的链接,更新安全链接的前提是IClientConfig将请求链接标记为安全链接时使用,具体源码如下:

@Override
        public URI reconstructURIWithServer(Server server, URI original) {
            // 判断是否为安全的链接,如果为安全链接在,则更新schema
            URI uri = updateToSecureConnectionIfNeeded(original, this.config,
                    this.serverIntrospector, server);
            // 重新组装URI
            return super.reconstructURIWithServer(server, uri);
        }

LoadBalancerContext#reconstructURIWithServer()

public URI reconstructURIWithServer(Server server, URI original) {
        String host = server.getHost();
        int port = server.getPort();
        String scheme = server.getScheme();
        
        if (host.equals(original.getHost()) 
                && port == original.getPort()
                && scheme == original.getScheme()) {
            return original;
        }
        if (scheme == null) {
            scheme = original.getScheme();
        }
        if (scheme == null) {
            scheme = deriveSchemeAndPortFromPartialUri(original).first();
        }

        try {
            StringBuilder sb = new StringBuilder();
            sb.append(scheme).append("://");
            if (!Strings.isNullOrEmpty(original.getRawUserInfo())) {
                sb.append(original.getRawUserInfo()).append("@");
            }
            sb.append(host);
            if (port >= 0) {
                sb.append(":").append(port);
            }
            sb.append(original.getRawPath());
            if (!Strings.isNullOrEmpty(original.getRawQuery())) {
                sb.append("?").append(original.getRawQuery());
            }
            if (!Strings.isNullOrEmpty(original.getRawFragment())) {
                sb.append("#").append(original.getRawFragment());
            }
            URI newURI = new URI(sb.toString());
            return newURI;            
        } catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }
    }

该方法其实很简单,主要是为了根据Serverhost以及端口组装成新的URI并返回。

RestClient#execute()

最终Request中还是依赖于RestClient对象来实现,具体源码如下:

private HttpResponse execute(HttpRequest.Verb verb, URI uri,
            Map<String, Collection<String>> headers, Map<String, Collection<String>> params,
            IClientConfig overriddenClientConfig, Object requestEntity) throws Exception {
        HttpClientResponse thisResponse = null;
        boolean bbFollowRedirects = bFollowRedirects;
        // 判断是否接收redirects请求
        if (overriddenClientConfig != null
        		// set whether we should auto follow redirects
        		&& overriddenClientConfig.getProperty(CommonClientConfigKey.FollowRedirects)!=null){
        	// use default directive from overall config
        	Boolean followRedirects = Boolean.valueOf(""+overriddenClientConfig.getProperty(CommonClientConfigKey.FollowRedirects, bFollowRedirects));
        	bbFollowRedirects = followRedirects.booleanValue();
        }
        restClient.setFollowRedirects(bbFollowRedirects);

        if (logger.isDebugEnabled()) {
            logger.debug("RestClient sending new Request(" + verb
                    + ": ) " + uri);
        }

        // 获取WebResource资源
        WebResource xResource = restClient.resource(uri.toString());
        // 请求参数
        if (params != null) {
            for (Map.Entry<String, Collection<String>> entry: params.entrySet()) {
                String name = entry.getKey();
                for (String value: entry.getValue()) {
                    xResource = xResource.queryParam(name, value);
                }
            }
        }
        ClientResponse jerseyResponse;

        Builder b = xResource.getRequestBuilder();

        if (headers != null) {
            for (Map.Entry<String, Collection<String>> entry: headers.entrySet()) {
                String name = entry.getKey();
                for (String value: entry.getValue()) {
                    b = b.header(name, value);
                }
            }
        }
        Object entity = requestEntity;
        
        switch (verb) {
        case GET:
            // 发送请求
            jerseyResponse = b.get(ClientResponse.class);
            break;
        case POST:
            jerseyResponse = b.post(ClientResponse.class, entity);
            break;
        case PUT:
            jerseyResponse = b.put(ClientResponse.class, entity);
            break;
        case DELETE:
            jerseyResponse = b.delete(ClientResponse.class);
            break;
        case HEAD:
            jerseyResponse = b.head();
            break;
        case OPTIONS:
            jerseyResponse = b.options(ClientResponse.class);
            break;
        default:
            throw new ClientException(
                    ClientException.ErrorType.GENERAL,
                    "You have to one of the REST verbs such as GET, POST etc.");
        }

        // 将结果封装成为HttpClientResponse
        thisResponse = new HttpClientResponse(jerseyResponse, uri, overriddenClientConfig);
        if (thisResponse.getStatus() == 503){
            thisResponse.close();
            throw new ClientException(ClientException.ErrorType.SERVER_THROTTLED);
        }
        return thisResponse;
    }

关于RestTemplate相关的源码就到这里,后面江湖补充RestTemplate请求的序列图,以便于更好的理解。