引入负载均衡器后发起请求的源码分析
前文中讲到的源码和作用,是项目启动时就做好的事情,更像一个准备过程。
什么意思呢?就是在项目启动时,RestTemplateConfig
类的实例对象已经构造好了且随时可以使用,而且它还被 LoadBalancer
自动配置类改造过了,里面多了一个拦截器 LoadBalancerInterceptor
,拦截器中所需的 BlockingLoadBalancerClient
类型的实例对象也构造好了且可以随时使用。注意,这些对象都在 IoC
容器中随时待命,并没有真正开始工作。
想要它们开始工作且弄懂它们是怎样工作的,就需要解答下面三个问题。
-
RestTemplate
中没有拦截器时是怎样工作的? -
被定制化后的
RestTemplate
对象是怎样进入拦截器逻辑的? -
拦截器中请求的发起流程是什么样的?
接下来笔者就结合源码一一解答这些问题。
RestTemplate中没有拦截器时是怎样工作的
RestTemplate
类中的 getForObject()
方法源码如下:
public <T> T getForObject(String url, Class<T> responseType, Object... uriVariables) throws RestClientException {
RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
HttpMessageConverterExtractor<T> responseExtractor =
new HttpMessageConverterExtractor<>(responseType, getMessageConverters(),
logger);
return execute(url, HttpMethod.GET, requestCallback, responseExtractor,
uriVariables);
}
发起请求后,最终会调用 org.springframework.http.client.InterceptingClientHttpRequest
类中的 execute()
方法,源码及注释如下:
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
// 判断是否有拦截器
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
// 如果有拦截器,就执行其拦截器中的 intercept() 方法
return nextInterceptor.intercept(request, body, this);
} else {
// 如果没有拦截器,就构造 ClientHttpRequest 对象并发起 HTTP 请求
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
} else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
在 execute()
方法中,会判断是否存在拦截器,如果有拦截器,就执行拦截器中的 intercept()
方法;如果没有拦截器,就构造 ClientHttpRequest
对象并发起请求。比如,第 5 章中的代码案例,当时并没有引入微服务架构中的套件或组件,就是在没有拦截器的情况下直接发起 HTTP
请求,这个过程比较简单。
被定制化后的RestTemplate对象是怎样进入拦截器逻辑的
通过前文的源码分析知道了在 execute()
方法中会判断是否存在拦截器。在引入了微服务架构中的组件后,LoadBalancer
自动配置生效时会向 RestTemplate
类型的实例注入 LoadBalancerInterceptor
拦截器,因此会进入拦截器中的 intercept()
,此时的请求处理就由这个拦截器来接管了。
为了更清楚地看懂这个流程,可以在 org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor
类的第 53 行打一个断点,并以 Debug
模式启动项目,之后访问如下地址:
http://localhost:8105/consumerTest
请求发出后程序会在这个断点处停住,如图 7-9 所示。

在图 7-9 中可以看到完整的方法调用链。
一切都由下方这行编码开始:
return restTemplate.getForObject(SERVICE URL + "/goodsServiceTest", String.class);
之后进入 restTemplate.getForObject()
方法,进入 InterceptingClientHttpRequest
类中的 execute()
方法后会判断是否存在拦截器。现在是存在拦截器的,所以执行 LoadBalancerInterceptor
类中的 intercept()
方法,源码如下:
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
// 交给BlockingLoadBalancerClient 类处理
return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
拦截器中处理请求的流程
引入负载均衡器后,请求最终会交由 BlockingLoadBalancerClient
类来处理。对于 BlockingLoadBalancerClient
类,读者应该不陌生,在服务发现的源码分析中提到了这个类。源码及源码注释如下:
public class BlockingLoadBalancerClient implements LoadBalancerClient {
// 已省略部分代码
private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory;
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request)
throws IOException {
String hint = getHint(serviceId);
LoadBalancerRequestAdapter<T, DefaultRequestContext> lbRequest =
new LoadBalancerRequestAdapter<>(request,
new DefaultRequestContext(request, hint));
Set<LoadBalancerLifecycle> supportedLifecycleProcessors =
getSupportedLifecycleProcessors(serviceId);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
// 此处尚未获取具体的请求实例,需要调用 choose() 方法获取一个可用的实例信息
ServiceInstance serviceInstance = choose(serviceId, lbRequest);
if (serviceInstance == null) {
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, new EmptyResponse())));
throw new IllegalStateException("No instances available for " + serviceId);
}
// 执行下一个 execute() 方法
return execute(serviceId, serviceInstance, lbRequest);
@Override
public <T > T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest < T > request)
throws IOException {
DefaultResponse defaultResponse = new DefaultResponse(serviceInstance);
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);
Request lbRequest = request instanceof Request ? (Request) request : new DefaultRequest<>();
supportedLifecycleProcessors
.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, serviceInstance));
try {
// 此时已获取服务实例并发送请求给一个服务实例,发送 HTTP 请求并接收响应
T response = request.apply(serviceInstance);
Object clientResponse = getClientResponse(response);
supportedLifecycleProcessors
.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.SUCCESS, lbRequest, defaultResponse, clientResponse)));
return response;
} catch (IOException ioException) {
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
new CompletionContext<>(CompletionContext.Status.FAILED, exception, lbRequest, defaultResponse)));
ReflectionUtils.rethrowRuntimeException(exception);
}
return null;
}
// 获取一个可用的实例信息
public <T > ServiceInstance choose(String serviceId, Request < T > request) {
// 获取一个负载均衡器,默认为 RoundRobinLoadBalancer
ReactiveLoadBalancer<ServiceInstance> loadBalancer =
ClientFactory.getInstance(serviceId);
if (loadBalancer == null) {
return null;
}
// 获取服务实例列表,并根据负载均衡算法获取其中一个可用的实例
// 在这里触发了服务发现的逻辑,根据服务名称获取 serviceInfoMap 变量中的可用实
// 例列表,可结合服务发现的源码进行理解
Response<ServiceInstance> loadBalancerResponse = Mono.from(
loadBalancer.choose(request)).block();
if (loadBalancerResponse == null) {
return null;
}
return loadBalancerResponse.getServer();
}
}
}
通过源码可知,这个类的主要作用就是先获取一个负载均衡器,然后调用负载均衡器中的方法获取一个可用的实例,再调用 LoadBalancerRequestFactory
类中的 createRequest()
方法创建请求对象,接着根据已知的实例信息获取真实的请求地址,最后发起请求并接收响应结果。