在spring cloud负载均衡之ribbon—demo实现 我们介绍了Ribbon的使用方式,其中给出了完整的demo使用方式,这篇文章将介绍RestTemplate类将介绍与Ribbon配合工作流程。如果对ribbon的工作原理感兴趣,可以参考spring clound负载均衡之Ribbon(三)- 工作原理 , 这篇文章介绍了Ribbon中重要的类型,以及类型的作用。
使用方式
在正式开始介绍RestTemplate方式工作方式之前,我们回顾下如何通过RestTemplate进行远程请求, 具体使用方式如下:
@GetMapping("/info")
public String info() {
return restTemplate.getForObject("http://SPRING-EUREKA-CLIENT-B/info", String.class);
}
在这个简单使用中,我们主要使用了getForObject()方法,请求远程服务,其中有两个参数需要留意:
- 请求URL: 请求URL中包含了对远程请求的服务名称
- 远程服务返回结果,将封装成对一个的结果对象。本demo中则直接使用String字符串。
工作原理
本篇文章将以getForObject()作为主要入口,RestTemplate如何与Ribbon配合并很好的工作的。我们看下RestTemplate的类图结构:
通过类图我们可以得知,RestTemplate与Ribbon的链接渠道是通过RibbonClientHttpRequestFactory进行链接,通过返回的ClientHttpRequest对象将Ribbon组件进行组装。下面我们以getForObject()方法为例,跟踪下具体的源码,查看请求是如何组装与最终执行的。
RestTemplate#getForObject()
该方法作为入口,接收请求信息,直接看源码:
@Override
@Nullable
public <T> T getForObject(String url, Class<T> responseType, Object... uriVariables) throws RestClientException {
// 创建RequstCallback对象,该对象主要对Request进行回调处理,默认为AcceptHeaderRequestCallback
RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
// 这里创建对response message的返回值进行处理, 通过对返回类型,以及MessageConverters进行绑定
HttpMessageConverterExtractor<T> responseExtractor =
new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger);
// 调用execute方法
return execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables);
}
public <T> RequestCallback acceptHeaderRequestCallback(Class<T> responseType) {
return new AcceptHeaderRequestCallback(responseType);
}
RestTemplate#execute()
@Override
@Nullable
public <T> T execute(String url, HttpMethod method, @Nullable RequestCallback requestCallback,
@Nullable ResponseExtractor<T> responseExtractor, Object... uriVariables) throws RestClientException {
// 这里是将url中的占位符信息,替换为variables中的具体的值, 组成完整的URI信息
URI expanded = getUriTemplateHandler().expand(url, uriVariables);
// 执行请求
return doExecute(expanded, method, requestCallback, responseExtractor);
}
RestTemplate#doExecute()
这里为正式执行http请求的方法,具体查看源码:
@Nullable
protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
@Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
// 参数校验
Assert.notNull(url, "URI is required");
Assert.notNull(method, "HttpMethod is required");
ClientHttpResponse response = null;
try {
// 创建ClientHttpRequest独享
ClientHttpRequest request = createRequest(url, method);
// 当RequestCallback不为空时,调用请求回调类, 这里默认为AcceptHeaderRequestCallback
if (requestCallback != null) {
requestCallback.doWithRequest(request);
}
// 执行request请求, 并获取response响应对象
response = request.execute();
// 处理response响应数据
handleResponse(url, method, response);
// 将请求结果转换为指定的responseType, 并返回
return (responseExtractor != null ? responseExtractor.extractData(response) : null);
}
catch (IOException ex) {
String resource = url.toString();
String query = url.getRawQuery();
resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);
throw new ResourceAccessException("I/O error on " + method.name() +
" request for \"" + resource + "\": " + ex.getMessage(), ex);
}
finally {
if (response != null) {
response.close();
}
}
}
在doExecute()方法中,逻辑是很明确的,主要为创建Request -> request 回调 -> 发送请求 -> 处理响应结果 -> 类型转换这么一个过程。
HttpAccessor#createRequest()
在上面类图中可以得知,其实创建Request对象是由父类来完成的,因此我们看下创建Request方法的源码:
protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
// 这里创建Request对象是通过HttpClientRequestFactory类来实现的
ClientHttpRequest request = getRequestFactory().createRequest(url, method);
if (logger.isDebugEnabled()) {
logger.debug("HTTP " + method.name() + " " + url);
}
return request;
}
InterceptingHttpAccessor#getRequestFactory
在获取RequestFactory的时候,其实InterceptingHttpAccessor类中对HttpClientRequestFactory类型做了扩展,增加了ClientHttpRequestInterceptor的扩展,具体逻辑如下:
@Override
public ClientHttpRequestFactory getRequestFactory() {
// 获取已经注册的interceptors的类型
List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
// 如果interceptors不为空,则改变HttpAccessor中的现有逻辑
if (!CollectionUtils.isEmpty(interceptors)) {
ClientHttpRequestFactory factory = this.interceptingRequestFactory;
if (factory == null) {
// 创建InterceptingClientHttpRequestFactory对象
factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
this.interceptingRequestFactory = factory;
}
return factory;
}
else {
return super.getRequestFactory();
}
}
}
该方法中可能会该变现有的逻辑,主要通过判断是否注册了interceptor,因此当有interceptor存在时,request请求必须先经过interceptor.intercept()方法之后,才会最终走到RibbonClientHttpRequestFactory类中,创建新的request对象发送请求。这里不做展开,感兴趣的可以自己查看。
RibbonClientHttpRequestFactory#createRequest()
最终HttpRequest的创建就在RibbonClientHttpRequestFactory中完成,我们看下源码:
public ClientHttpRequest createRequest(URI originalUri, HttpMethod httpMethod) throws IOException {
// 获取uri中的host,因为我们发送请求的时候,使用的为serviceId, 因此这里直接就是serviceId信息
String serviceId = originalUri.getHost();
if (serviceId == null) {
throw new IOException(
"Invalid hostname in the URI [" + originalUri.toASCIIString() + "]");
}
// 从SpringClientFactory中获取IClientConfig bean, 默认为DefaultClientConfigImpl中实现
IClientConfig clientConfig = this.clientFactory.getClientConfig(serviceId);
// 获取RestClient对象, 默认为OverrideRestClient对象
RestClient client = this.clientFactory.getClient(serviceId, RestClient.class);
HttpRequest.Verb verb = HttpRequest.Verb.valueOf(httpMethod.name());
return new RibbonHttpRequest(originalUri, verb, client, clientConfig);
}
RibbonHttpRequest#execute()
执行http请求, 查看下具体源码信息:
@Override
protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
try {
// 向http中加入headers信息
addHeaders(headers);
// 当outputstream已经存在时,则关闭,并读取request中的请求信息
if (outputStream != null) {
outputStream.close();
builder.entity(outputStream.toByteArray());
}
// 获取HttpRequest对象,对象中包含了URI与Verb信息
HttpRequest request = builder.build();
// 执行请求,与LoadBalancer一起执行
HttpResponse response = client.executeWithLoadBalancer(request, config);
return new RibbonHttpResponse(response);
}
catch (Exception e) {
throw new IOException(e);
}
}
AbstractLoadBalancerAwareClient#executeWithLoadBalancer()
在通过RestClient调用时,最终都会通过executeWithLoadBalancer()方法进行预处理,具体源码如下:
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
// 获取LoadBalancerCommand对象
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
// 提交并执行command
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
// 根据server重新组装URI信息
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
// 执行http请求
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}
// 构建LoadBalancerCommand
protected LoadBalancerCommand<T> buildLoadBalancerCommand(final S request, final IClientConfig config) {
// 设置retry的操作设置
RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, config);
// 创建LoadBalancerCommand
LoadBalancerCommand.Builder<T> builder = LoadBalancerCommand.<T>builder()
.withLoadBalancerContext(this) // 设置LoadBalancerContext
.withRetryHandler(handler) // 设置重试
.withLoadBalancerURI(request.getUri()); // 设置uri
// 自定义操作
customizeLoadBalancerCommandBuilder(request, config, builder);
// 构建Command
return builder.build();
}
ZoneAwareLoadBalancer#chooseServer()
因为在Ribbon中还是采用的响应是编程,因此,构建Observable对象时,这里就不贴源码了,最终在Request执行时,则需要从ILoadBalancer中选择一个合适的Server,并返回,具体源码如下:
@Override
public Server chooseServer(Object key) {
// 判断是否启用ZoneAwareLoadBalancer, 或则当前可用的zone只有一个时,直接从当前zone选择server
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
Server server = null;
try {
// 获取stats状态信息
LoadBalancerStats lbStats = getLoadBalancerStats();
// 创建缓存数据快照
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
}
if (triggeringBlackoutPercentage == null) {
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
// 从快照中获取可用的快照列表
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
// 可用快照存在时
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
// 随机选择一个可用的zone
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
// 获取区域的LoadBalancer对象
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
// 从当前区域中选择Server
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Exception e) {
logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
}
if (server != null) {
return server;
} else {
// 当选择Server失败时,重新选择Server
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key);
}
}
BaseLoadBalancer#chooseServer()
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
// 当IRule存在时,从rule中选取Server信息,当前默认为: ZoneAvoidanceRule
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
PredicateBaseRule#choose()
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
// 1. 从LoadBalancer中获取所有的Server列表
// 2. 通过Predicate选择Server
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
AbstractServerPredicate#chooseRoundRobinAfterFiltering()
在该方法中,其实就是一个RoundRobin的算法,轮询的请求后端Server. 具体源码如下:
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
// 获取可以使用的Server列表
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
// 根据RoundRobin算法计算对应的index, 并获取Server
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextIndex.get();
int next = (current + 1) % modulo;
if (nextIndex.compareAndSet(current, next) && current < modulo)
return current;
}
}
RibbonClientConfiguration.OverrideRestClient#reconstructURIWithServer()
该方法内部主要是为了将请求地址更新到安全的链接,更新安全链接的前提是IClientConfig将请求链接标记为安全链接时使用,具体源码如下:
@Override
public URI reconstructURIWithServer(Server server, URI original) {
// 判断是否为安全的链接,如果为安全链接在,则更新schema
URI uri = updateToSecureConnectionIfNeeded(original, this.config,
this.serverIntrospector, server);
// 重新组装URI
return super.reconstructURIWithServer(server, uri);
}
LoadBalancerContext#reconstructURIWithServer()
public URI reconstructURIWithServer(Server server, URI original) {
String host = server.getHost();
int port = server.getPort();
String scheme = server.getScheme();
if (host.equals(original.getHost())
&& port == original.getPort()
&& scheme == original.getScheme()) {
return original;
}
if (scheme == null) {
scheme = original.getScheme();
}
if (scheme == null) {
scheme = deriveSchemeAndPortFromPartialUri(original).first();
}
try {
StringBuilder sb = new StringBuilder();
sb.append(scheme).append("://");
if (!Strings.isNullOrEmpty(original.getRawUserInfo())) {
sb.append(original.getRawUserInfo()).append("@");
}
sb.append(host);
if (port >= 0) {
sb.append(":").append(port);
}
sb.append(original.getRawPath());
if (!Strings.isNullOrEmpty(original.getRawQuery())) {
sb.append("?").append(original.getRawQuery());
}
if (!Strings.isNullOrEmpty(original.getRawFragment())) {
sb.append("#").append(original.getRawFragment());
}
URI newURI = new URI(sb.toString());
return newURI;
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
该方法其实很简单,主要是为了根据Server的host以及端口组装成新的URI并返回。
RestClient#execute()
最终Request中还是依赖于RestClient对象来实现,具体源码如下:
private HttpResponse execute(HttpRequest.Verb verb, URI uri,
Map<String, Collection<String>> headers, Map<String, Collection<String>> params,
IClientConfig overriddenClientConfig, Object requestEntity) throws Exception {
HttpClientResponse thisResponse = null;
boolean bbFollowRedirects = bFollowRedirects;
// 判断是否接收redirects请求
if (overriddenClientConfig != null
// set whether we should auto follow redirects
&& overriddenClientConfig.getProperty(CommonClientConfigKey.FollowRedirects)!=null){
// use default directive from overall config
Boolean followRedirects = Boolean.valueOf(""+overriddenClientConfig.getProperty(CommonClientConfigKey.FollowRedirects, bFollowRedirects));
bbFollowRedirects = followRedirects.booleanValue();
}
restClient.setFollowRedirects(bbFollowRedirects);
if (logger.isDebugEnabled()) {
logger.debug("RestClient sending new Request(" + verb
+ ": ) " + uri);
}
// 获取WebResource资源
WebResource xResource = restClient.resource(uri.toString());
// 请求参数
if (params != null) {
for (Map.Entry<String, Collection<String>> entry: params.entrySet()) {
String name = entry.getKey();
for (String value: entry.getValue()) {
xResource = xResource.queryParam(name, value);
}
}
}
ClientResponse jerseyResponse;
Builder b = xResource.getRequestBuilder();
if (headers != null) {
for (Map.Entry<String, Collection<String>> entry: headers.entrySet()) {
String name = entry.getKey();
for (String value: entry.getValue()) {
b = b.header(name, value);
}
}
}
Object entity = requestEntity;
switch (verb) {
case GET:
// 发送请求
jerseyResponse = b.get(ClientResponse.class);
break;
case POST:
jerseyResponse = b.post(ClientResponse.class, entity);
break;
case PUT:
jerseyResponse = b.put(ClientResponse.class, entity);
break;
case DELETE:
jerseyResponse = b.delete(ClientResponse.class);
break;
case HEAD:
jerseyResponse = b.head();
break;
case OPTIONS:
jerseyResponse = b.options(ClientResponse.class);
break;
default:
throw new ClientException(
ClientException.ErrorType.GENERAL,
"You have to one of the REST verbs such as GET, POST etc.");
}
// 将结果封装成为HttpClientResponse
thisResponse = new HttpClientResponse(jerseyResponse, uri, overriddenClientConfig);
if (thisResponse.getStatus() == 503){
thisResponse.close();
throw new ClientException(ClientException.ErrorType.SERVER_THROTTLED);
}
return thisResponse;
}
关于RestTemplate相关的源码就到这里,后面江湖补充RestTemplate请求的序列图,以便于更好的理解。
