引入负载均衡器后发起请求的源码分析

前文中讲到的源码和作用,是项目启动时就做好的事情,更像一个准备过程。

什么意思呢?就是在项目启动时,RestTemplateConfig 类的实例对象已经构造好了且随时可以使用,而且它还被 LoadBalancer 自动配置类改造过了,里面多了一个拦截器 LoadBalancerInterceptor,拦截器中所需的 BlockingLoadBalancerClient 类型的实例对象也构造好了且可以随时使用。注意,这些对象都在 IoC 容器中随时待命,并没有真正开始工作。

想要它们开始工作且弄懂它们是怎样工作的,就需要解答下面三个问题。

  1. RestTemplate 中没有拦截器时是怎样工作的?

  2. 被定制化后的 RestTemplate 对象是怎样进入拦截器逻辑的?

  3. 拦截器中请求的发起流程是什么样的?

接下来笔者就结合源码一一解答这些问题。

RestTemplate中没有拦截器时是怎样工作的

RestTemplate 类中的 getForObject() 方法源码如下:

public <T> T getForObject(String url, Class<T> responseType, Object... uriVariables) throws RestClientException {
    RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
    HttpMessageConverterExtractor<T> responseExtractor =
            new HttpMessageConverterExtractor<>(responseType, getMessageConverters(),
                    logger);
    return execute(url, HttpMethod.GET, requestCallback, responseExtractor,
            uriVariables);
}

发起请求后,最终会调用 org.springframework.http.client.InterceptingClientHttpRequest 类中的 execute() 方法,源码及注释如下:

public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
    // 判断是否有拦截器
    if (this.iterator.hasNext()) {
        ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
        // 如果有拦截器,就执行其拦截器中的 intercept() 方法
        return nextInterceptor.intercept(request, body, this);
    } else {
        // 如果没有拦截器,就构造 ClientHttpRequest 对象并发起 HTTP 请求
        HttpMethod method = request.getMethod();
        Assert.state(method != null, "No standard HTTP method");
        ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
        request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
        if (body.length > 0) {
            if (delegate instanceof StreamingHttpOutputMessage) {
                StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
            } else {
                StreamUtils.copy(body, delegate.getBody());
            }
        }
        return delegate.execute();
    }
}

execute() 方法中,会判断是否存在拦截器,如果有拦截器,就执行拦截器中的 intercept() 方法;如果没有拦截器,就构造 ClientHttpRequest 对象并发起请求。比如,第 5 章中的代码案例,当时并没有引入微服务架构中的套件或组件,就是在没有拦截器的情况下直接发起 HTTP 请求,这个过程比较简单。

被定制化后的RestTemplate对象是怎样进入拦截器逻辑的

通过前文的源码分析知道了在 execute() 方法中会判断是否存在拦截器。在引入了微服务架构中的组件后,LoadBalancer 自动配置生效时会向 RestTemplate 类型的实例注入 LoadBalancerInterceptor 拦截器,因此会进入拦截器中的 intercept(),此时的请求处理就由这个拦截器来接管了。

为了更清楚地看懂这个流程,可以在 org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor 类的第 53 行打一个断点,并以 Debug 模式启动项目,之后访问如下地址:

http://localhost:8105/consumerTest

请求发出后程序会在这个断点处停住,如图 7-9 所示。

image 2025 04 16 15 53 24 788
Figure 1. 图7-9 引入负载均衡组件后处理请求的方法调用链

在图 7-9 中可以看到完整的方法调用链。

一切都由下方这行编码开始:

return restTemplate.getForObject(SERVICE URL + "/goodsServiceTest", String.class);

之后进入 restTemplate.getForObject() 方法,进入 InterceptingClientHttpRequest 类中的 execute() 方法后会判断是否存在拦截器。现在是存在拦截器的,所以执行 LoadBalancerInterceptor 类中的 intercept() 方法,源码如下:

public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
                                     final ClientHttpRequestExecution execution) throws IOException {
    final URI originalUri = request.getURI();
    String serviceName = originalUri.getHost();
    Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
    // 交给BlockingLoadBalancerClient 类处理
    return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}

拦截器中处理请求的流程

引入负载均衡器后,请求最终会交由 BlockingLoadBalancerClient 类来处理。对于 BlockingLoadBalancerClient 类,读者应该不陌生,在服务发现的源码分析中提到了这个类。源码及源码注释如下:

public class BlockingLoadBalancerClient implements LoadBalancerClient {

    // 已省略部分代码

    private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory;

    @Override
    public <T> T execute(String serviceId, LoadBalancerRequest<T> request)
            throws IOException {
        String hint = getHint(serviceId);
        LoadBalancerRequestAdapter<T, DefaultRequestContext> lbRequest =
                new LoadBalancerRequestAdapter<>(request,
                        new DefaultRequestContext(request, hint));
        Set<LoadBalancerLifecycle> supportedLifecycleProcessors =
                getSupportedLifecycleProcessors(serviceId);
        supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
        // 此处尚未获取具体的请求实例,需要调用 choose() 方法获取一个可用的实例信息
        ServiceInstance serviceInstance = choose(serviceId, lbRequest);
        if (serviceInstance == null) {
            supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
                    new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, new EmptyResponse())));
            throw new IllegalStateException("No instances available for " + serviceId);
        }
        // 执行下一个 execute() 方法
        return execute(serviceId, serviceInstance, lbRequest);

        @Override
        public <T > T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest < T > request)
        throws IOException {
            DefaultResponse defaultResponse = new DefaultResponse(serviceInstance);
            Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);
            Request lbRequest = request instanceof Request ? (Request) request : new DefaultRequest<>();
            supportedLifecycleProcessors
                    .forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, serviceInstance));

            try {
                // 此时已获取服务实例并发送请求给一个服务实例,发送 HTTP 请求并接收响应
                T response = request.apply(serviceInstance);
                Object clientResponse = getClientResponse(response);
                supportedLifecycleProcessors
                        .forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.SUCCESS, lbRequest, defaultResponse, clientResponse)));
                return response;
            } catch (IOException ioException) {
                supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
                        new CompletionContext<>(CompletionContext.Status.FAILED, exception, lbRequest, defaultResponse)));
                ReflectionUtils.rethrowRuntimeException(exception);
            }
            return null;
        }

        // 获取一个可用的实例信息
        public <T > ServiceInstance choose(String serviceId, Request < T > request) {
            // 获取一个负载均衡器,默认为 RoundRobinLoadBalancer
            ReactiveLoadBalancer<ServiceInstance> loadBalancer =
                    ClientFactory.getInstance(serviceId);
            if (loadBalancer == null) {
                return null;
            }
            // 获取服务实例列表,并根据负载均衡算法获取其中一个可用的实例
            // 在这里触发了服务发现的逻辑,根据服务名称获取 serviceInfoMap 变量中的可用实
            // 例列表,可结合服务发现的源码进行理解
            Response<ServiceInstance> loadBalancerResponse = Mono.from(
                    loadBalancer.choose(request)).block();
            if (loadBalancerResponse == null) {
                return null;
            }
            return loadBalancerResponse.getServer();
        }
    }
}

通过源码可知,这个类的主要作用就是先获取一个负载均衡器,然后调用负载均衡器中的方法获取一个可用的实例,再调用 LoadBalancerRequestFactory 类中的 createRequest() 方法创建请求对象,接着根据已知的实例信息获取真实的请求地址,最后发起请求并接收响应结果。