Nacos整合之服务发现编码实践

就像用微信与远方的朋友视频通话,一方发起视频通话,另一方就可以接听了。本节笔者将讲解服务治理中的服务发现流程,编写一个服务实例并将其注册至 Nacos 服务中心,重要知识点为代码整合步骤、Nacos 服务中心相关的配置项和服务发现的源码。

编写服务消费端的代码

在前文提供的代码的基础上,新建一个模块,并将其命名为 nacos-consumer-demoJava 代码的包名为 ltd.newbee.cloud。在该模块的 pom.xml 配置文件中增加 parent 标签,与上层 Maven 建立好关系。之后,在这个子模块的 pom.xml 文件中加入 Nacos 的依赖项 spring-cloud-starter-alibaba-nacos-discovery。最终子节点 nacos-consumer-demopom.xml 源码如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>ltd.newbee.cloud</groupId>
    <artifactId>nacos-consumer-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>nacos-consumer-demo</name>
    <description>Spring Cloud Alibaba Nacos Consumer Demo</description>
    <parent>
        <groupId>ltd.newbee.cloud</groupId>
        <artifactId>spring-cloud-alibaba-nacos-demo</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
    </dependencies>
</project>

nacos-consumer-demo 中进行简单的功能编码。先把该 Spring Boot 项目的端口号设置为 8093,然后创建 ltd.newbee.cloud.api 包,并在该包中新建 ConsumerTestController 类,代码如下:

package ltd.newbee.cloud.api;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;

@RestController
public class ConsumerTestController {

    // 测试方法,暂未通过 Nacos 调用下级服务
    @GetMapping("/nacosRegTest")
    public String nacosRegTest() {
        return "nacosRegTest";
    }
}

将启动类命名为 ConsumerApplication,代码如下:

package ltd.newbee.cloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

}

基础编码完成。

将服务注册至Nacos

下面就要把 nacos-consumer-demo 注册到 Nacos 中。在 application.properties 配置文件中添加 Nacos 配置项,代码如下:

# 项目启动端口
server.port=8093
# 应用名称
spring.application.name=newbee-cloud-consumer-service
# 服务中心 Nacos 地址
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
# 注册名(默认为 nacos,可自行修改)
spring.cloud.nacos.username=nacos
# 登录密码(默认为 nacos,可自行修改)
spring.cloud.nacos.password=nacos

启动 nacos-consumer-demo 项目,并验证其是否注册到 Nacos 中。成功启动后进入 Nacos 控制台,单击 “服务管理” 中的 “服务列表”,可以看到列表中的服务信息,如图 6-15 所示。newbee-cloud-consumer-service 注册成功。

image 2025 04 16 12 39 41 465
Figure 1. 图6-15 Nacos 控制台中的“服务列表”页面

注册至 Nacos 成功后,下面就要编写与服务通信相关的代码了。

编写服务通信代码

下面借助 RestTemplate 工具实现整合 Nacos 后服务之间的服务通信。

newbee-cloud-consumer-service 项目中新建 config 包,并新建 RestTemplate 的配置类,代码如下:

package ltd.newbee.cloud.config;

import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.web.client.RestTemplate;

import java.nio.charset.Charset;

@Configuration
public class RestTemplateConfig {

    @LoadBalanced // 负载均衡
    @Bean
    public RestTemplate restTemplate(ClientHttpRequestFactory factory) {
        RestTemplate restTemplate = new RestTemplate(factory);
        // UTF-8 编码设置
        restTemplate.getMessageConverters().set(1,
                new StringHttpMessageConverter(Charset.forName("UTF-8")));
        return restTemplate;
    }

    @Bean
    public ClientHttpRequestFactory simpleClientHttpRequestFactory() {
        SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
        // 超时时间为 10 秒
        factory.setReadTimeout(10 * 1000);
        // 超时时间为 5 秒
        factory.setConnectTimeout(5 * 1000);
        return factory;
    }
}

关于 RestTemplate 工具读者应该并不陌生,在第 5 章中已做过介绍。不同的是,此处多了一个 @LoadBalanced 注解,添加此注解后,RestTemplate 就有了客户端负载均衡的功能。如果不加这个注解,在服务调用时就会出现如下报错信息:

java.net.UnknownHostException

这也是服务消费端与服务提供端的不同。不仅需要添加 @LoadBalanced 注解,消费端在发起调用时还需要添加负载均衡模块,因此需要在 nacos-consumer-demo 中引入如下依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>

关于负载均衡的相关知识会在第 7 章中进行详细讲解。

ConsumerTestController 类中新增如下代码:

private final String SERVICE_URL = "http://newbee-cloud-goods-service";

// 通过 Nacos 调用下级服务
@GetMapping("/consumerTest")
public String consumerTest() {
    return restTemplate.getForObject(SERVICE_URL + "/goodsServiceTest", String.class);
}

在上述代码中,SERVICE_URL 变量的写法变成了 “http://”+服务名称,而不是 “http://”+IP地址+端口号。不过,由于服务中心存在,这样也能够完成对 newbee-cloud-goods-service 的调用。启动 newbee-cloud-consumer-service 项目,打开浏览器并输入如下地址:

http://localhost:8093/consumerTest

访问后的结果如图 6-16 所示。

image 2025 04 16 12 47 31 409
Figure 2. 图6-16 访问结果

可以看出,已经成功获取 newbee-cloud-goods-service 服务中的接口响应。当然,前提是 newbee-cloud-goods-service 成功启动并注册到 Nacos 中,否则会报 500 错误。

接下来又到思考时间了。

“newbee-cloud-goods-service” 这个字符串是如何被转换成 IP地址+端口号的?因为直接使用 “http://”+服务名称的方式是无法正确地发起 HTTP 请求的。本节主要介绍服务发现的知识,读者应该也能猜到是因为服务发现机制的存在,通过服务名称能够获取服务信息,而服务信息中包括 IP 地址、端口号等字段。

那么问题来了,“newbee-cloud-goods-service” 这个字符串被转换成 IP地址+端口号是在什么时候完成的?

再进一步思考,服务发现这四个字背后的原理是什么?服务发现是在什么时候开始的?服务发现又做了什么?

服务发现的源码分析

为了让读者知晓服务发现的原理,笔者结合源码和 Spring Boot 框架的自动装配(AutoConfiguration)机制来讲解。

服务发现机制的自动配置源码分析

下面讲解与服务发现相关的自动配置。前文中讲到了服务发现的源码,这部分源码在 spring-cloud-starter-alibaba-nacos-discovery-2021.0.1.0.jar 中。spring-cloud-starter-alibaba-nacos-discovery-2021.0.1.0.jar 中不仅有服务注册的自动配置类,与服务发现相关的自动配置类也在这个 JAR 包中,如图 6-17 所示。

image 2025 04 16 12 51 25 318
Figure 3. 图6-17 服务发现的自动配置类截图

服务发现自动装配的主角是 NacosDiscoveryAutoConfiguration 类,源码如下(已省略部分代码)。

package com.alibaba.cloud.nacos.discovery;
// 配置类
@Configuration(proxyBeanMethods = false)
// 自动配置生效条件 1
@ConditionalOnDiscoveryEnabled
// 自动配置生效条件 2
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryAutoConfiguration {

    @Bean // 注册 NacosDiscoveryProperties 到 IoC 容器中
    @ConditionalOnMissingBean // 当前 IoC 容器中不存在 NacosDiscoveryProperties 类型的 Bean 时注册,如果已经存在,则不会再次注册
    public NacosDiscoveryProperties nacosProperties() {
        return new NacosDiscoveryProperties();
    }

    @Bean // 注册 NacosServiceDiscovery 到 IoC 容器中
    @ConditionalOnMissingBean // 当前 IoC 容器中不存在 NacosServiceDiscovery 类型的 Bean 时注册,如果已经存在,则不会再次注册
    public NacosServiceDiscovery nacosServiceDiscovery(
            NacosDiscoveryProperties discoveryProperties,
            NacosServiceManager nacosServiceManager) {
        return new NacosServiceDiscovery(discoveryProperties,
                nacosServiceManager);
    }
}

与服务注册的自动配置类 NacosServiceRegistryAutoConfiguration 的生效条件一致,也是 spring.cloud.service-registry.auto-registration.enabled=truespring.cloud.nacos.discovery.enabled=true。因为这两个配置项的默认值都是 true,所以服务发现的自动配置类 NacosDiscoveryAutoConfiguration 也会生效。

该自动配置类生效后会向 IoC 容器中注册两个 Bean,分别是 NacosDiscoveryProperties 类和 NacosServiceDiscovery 类。NacosDiscoveryProperties 类负责读取配置文件中与 Nacos 相关的配置项,不多做描述。重点关注一下 NacosServiceDiscovery 类,源码及注释如下:

package com.alibaba.cloud.nacos.discovery;

public class NacosServiceDiscovery {

    private NacosDiscoveryProperties discoveryProperties;

    private NacosServiceManager nacosServiceManager;

    public NacosServiceDiscovery(NacosDiscoveryProperties discoveryProperties,
                                 NacosServiceManager nacosServiceManager) {
        this.discoveryProperties = discoveryProperties;
        this.nacosServiceManager = nacosServiceManager;
    }

    // 根据服务名称获取所有已注册的服务实例清单
    public List<ServiceInstance> getInstances(String serviceId) throws
            NacosException {
        String group = discoveryProperties.getGroup();
        // 实际调用 NamingService 中的 selectInstances() 方法获取 Instance 列表
        List<Instance> instances = namingService().selectInstances(serviceId, group,
                true);
        // 类型转换
        return hostToServiceInstanceList(instances, serviceId);
    }

    // 获取所有的服务名称
    public List<String> getServices() throws NacosException {
        String group = discoveryProperties.getGroup();
        ListView<String> services = namingService().getServicesOfServer(1,
                Integer.MAX_VALUE, group);
        return services.getData();
    }

    // 省略部分代码
}

NacosServiceDiscovery 是服务发现的功能类,需要重点关注的方法是 getInstances(),该方法会实际调用 NamingService 类中的 selectInstances() 方法获取 Instance 列表。继续跟入 selectInstances() 方法,可以看到该方法的具体实现源码:

// 根据服务名称获取 Instance 列表
public List<Instance> selectInstances(String serviceName, String groupName,
                                      List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {
    ServiceInfo serviceInfo;
    if (subscribe) {
        // 调用 HostReactor 类中的 getServiceInfo() 方法获取实例列表,并开启线程定时更新
        serviceInfo = this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
    } else {
        // 调用 HostReactor 类中的 getServiceInfoDirectlyFromServer() 方法获取实例列表,这个方法直接向 Nacos Server 发起调用并获取数据,不会将服务列表放入本地内存进行维护和更新
        serviceInfo = this.hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
    }
    // 工具方法,类型转换
    return this.selectInstances(serviceInfo, healthy);
}

继续跟入 getServiceInfo() 方法,该方法位于 com.alibaba.nacos.client.naming.core.HostReactor 类中(这个类中的源码一定要重点学习,是核心类),源码及源码注释如下:

public ServiceInfo getServiceInfo(String serviceName, String clusters) {
    LogUtils.NAMING_LOGGER.debug("[failover-mode] failoverSwitch: " +
            this.failoverReactor.isFailoverSwitch());
    String key = ServiceInfo.getKey(serviceName, clusters);
    if (this.failoverReactor.isFailoverSwitch()) {
        return this.failoverReactor.getService(key);
    } else {
        // 从 ServiceInfoMap 中获取实例信息
        ServiceInfo serviceObj = this.getServiceInfo0(serviceName, clusters);
        if (null == serviceObj) {
            // 如果 ServiceInfoMap 中没有该实例信息,则直接向 Nacos Server 请求
            serviceObj = new ServiceInfo(serviceName, clusters);
            serviceInfoMap.put(serviceObj.getKey(), serviceObj);
            updatingMap.put(serviceName, new Object());
            // 向 Nacos Server 请求获取实例信息,并存放到 serviceInfoMap 中
            updateServiceNow(serviceName, clusters);
            updatingMap.remove(serviceName);
        } else if (this.updatingMap.containsKey(serviceName)) {
            synchronized (serviceObj) {
                try {
                    serviceObj.wait(5000L);
                } catch (InterruptedException var8) {
                    LogUtils.NAMING_LOGGER.error("[getServiceInfo] serviceName:" +
                            serviceName + ", clusters:" + clusters, var8);
                }
            }
        }
        return serviceObj;
    }
    // 开启一个线程,定时更新 serviceInfoMap 中的实例数据
    scheduleUpdateIfAbsent(serviceName, clusters);
    // 从 serviceInfoMap 中获取该实例信息并返回
    return (ServiceInfo) this.serviceInfoMap.get(serviceObj.getKey());
}

通过对源码的梳理,可以得知服务发现的自动配置过程最终会向 IoC 容器中注册两个 Bean,分别是 NacosDiscoveryProperties 类和 NacosServiceDiscovery 类。NacosServiceDiscovery 类中有方法可以向 Nacos Server 发起请求并获取服务实例信息,并最终保存到 HostReactor 类的 serviceInfoMap 变量中。serviceInfoMap 的定义和初始化代码如下:

private final Map<String, ServiceInfo> serviceInfoMap;

// 省略部分代码

if (loadCacheAtStart) {
    this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(
        DiskCache.read(this.cacheDir));
} else {
    this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
}

serviceInfoMap 是一个 ConcurrentHashMap 类型的 Map 对象,用于存放实例信息。

另外,有一点需要注意,服务发现自动配置之后,只是向 IoC 容器中注册了两个 Bean,此时 serviceInfoMap 变量中并没有额外的实例信息存进来。比如,本节中演示的 newbee-cloud-goods-service 服务,在 nacos-consumer-demo 项目启动后,serviceInfoMap 中是没有 newbee-cloud-goods-service 服务信息的。这里需要一个触发条件,就是要实际调用过 NacosServiceDiscovery 类中的 getInstances(String serviceId) 方法,这样才知道需要拉取哪个服务,进而向 serviceInfoMap 变量中存放一个实例信息。

比如,在当前的演示项目中,ConsumerTestController 类中的 consumerTest() 方法会调用另外一个服务,这个服务名称是 “newbee-cloud-goods-service”,在代码执行过程中需要向 Nacos Server 发起请求并获取名称为 “newbee-cloud-goods-service” 的实例信息,获取之后才会存放到 serviceInfoMap 变量中。理解上面这段话,笔者将结合实际的代码执行流程来讲解。先以 Debug 模式启动 nacos-consumer-demo 项目,然后在 com.alibaba.nacos.client.naming.core.HostReactor 类中的第 315 行和第 340 行打上两个断点(不同版本的代码可能行号不同,具体可以结合笔者给的代码截图来打断点)。

访问如下地址:

http://localhost:8089/consumerTest

注意,这是第一次访问。

请求发起后,程序在 HostReactor 类的第 315 行这个断点处停住了,如图 6-18 所示。

image 2025 04 16 13 08 11 151
Figure 4. 图6-18 第一次访问时在 HostReactor 类的第315行处停住

由图 6-18 可知,此时 serviceObj 变量为 null,也就是说 serviceInfoMap 变量中并没有名称为 “newbee-cloud-goods-service” 的实例信息。因此,直接在第 315 行的这个断点处停住。serviceObj 变量为 null,会执行第 320 行的 updateServiceNow() 方法,向 Nacos Server 请求获取实例信息并存放到 serviceInfoMap 变量中。

接下来,单击 Debug 放行按钮跳过第 315 行的这个断点。此时,程序在进入第 340 行的断点处停住,如图 6-19 所示。

这时 serviceInfoMap 变量中已经有名称为 “newbee-cloud-goods-service” 的实例信息了,因为在第 320 行时已经获取到了。放行这个断点,继续进行第二次访问。

第二次访问如下地址:

http://localhost:8093/consumerTest

注意,这已经不是第一次访问了。

请求发起后,程序直接在 HostReactor 类的第 340 行这个断点处停住,并没有如第一次访问时在第 315 行处停住,如图 6-20 所示。

image 2025 04 16 13 11 13 313
Figure 5. 图6-19 第一次访问时 serviceInfoMap 变量的值
image 2025 04 16 13 16 34 578
Figure 6. 图6-20 第二次访问时 serviceObj 变量的值

这是因为 serviceInfoMap 变量中已经存在名称为 “newbee-cloud-goods-service” 的实例信息了,直接返回数据给上层方法即可。

以上就是服务发现机制的基础源码分析,通过源码分析可知,服务的实例信息是存放在 HostReactor 类的 serviceInfoMap 变量中的。想要获取某个名称的服务信息,最终都要读取 serviceInfoMap 变量。如果这条数据存在,则直接返回;如果不存在,则直接向 Nacos Server 发起请求并存放到 serviceInfoMap 变量中,之后返回这条数据给上层调用方法。

将服务信息转换为请求地址的源码分析

通过服务发现机制的基础源码分析可知,在向另一个服务发起请求时,服务的实例信息是已经获取到的,否则无法正确发起请求。接下来,笔者将结合源码来解答下面这个问题。

“newbee-cloud-goods-service” 这个字符串被转换成 IP地址+端口号 是在什么时候完成的?

Debug 模式下,通过方法调用栈的追踪,定位到在向另一个服务发起请求时,实际执行的是 BlockingLoadBalancerClient 类中的 execute() 方法,如图 6-21 所示。通过查看源码可知,在 BlockingLoadBalancerClient 类的第 80 行已经获取名称为 “newbee-cloud-goods-service” 的实例信息了,在 BlockingLoadBalancerClient 类的第 98 行已经获取响应结果了。因此,服务信息转换成 IP地址+端口号肯定是在 apply() 方法中完成的。

image 2025 04 16 13 18 20 119
Figure 7. 图6-21 在 BlockingLoadBalancerClient 类中的调试过程

BlockingLoadBalancerClient 类的第 98 行打上断点,查看其中的源码,最终得到了想要的结果。在实际发送请求前,HttpRequest 对象会在 org.springframework.cloud.client.loadbalancer.LoadBalancerRequestFactory 类的 createRequest() 方法中被重新包装成 ServiceRequestWrapper 对象,发起对另一个服务的请求。

createRequest() 方法中打上断点,在 IDEA 编辑器中单击鼠标右键,在弹出的快捷菜单中选择 “Evaluate Expression” 选项,并在输入框中输入如下表达式:

serviceRequest.getURI()

该表达式是 serviceRequest 的请求地址,最终得出结果,如图 6-22 所示。

image 2025 04 16 13 19 50 388
Figure 8. 图6-22 serviceRequest 的请求结果

于是第一次看到了由服务名称获取的真实请求地址。在 ServiceRequestWrapper 类中定义了 getURI() 的方法,用来返回拼接好的地址信息,方法定义如下:

public URI getURI() {
    URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());
    return uri;
}

通过跟入源码,最终得到将实例信息拼接成 URI 字符串的代码:

private static URI doReconstructURI(ServiceInstance serviceInstance, URI original) {

    String host = serviceInstance.getHost();
    String scheme = Optional.ofNullable(serviceInstance.getScheme())
            .orElse(computeScheme(original, serviceInstance));
    int port = computePort(serviceInstance.getPort(), scheme);

    if (Objects.equals(host, original.getHost()) && port == original.getPort()
            && Objects.equals(scheme, original.getScheme())) {
        return original;
    }

    boolean encoded = containsEncodedParts(original);
    return UriComponentsBuilder.fromUri(original).scheme(scheme).host(host).port(port).build(encoded).toUri();
}

在这里将获取的服务实例信息中的数据和 URI 变量中的数据进行拼接,最终完成了由 “newbee-cloud-goods-service” 字符串转换成真实的请求地址的过程,最终发起请求并获取正确的响应结果。

服务发现机制中实例信息的更新流程

在通过服务名称调用另一个服务前,会获取对应的服务信息。通过前文中的源码分析可知,这个数据存放在 HostReactor 类的 serviceInfoMap 变量中。serviceInfoMap 变量的存储格式如图 6-23 所示。

image 2025 04 16 13 22 32 187
Figure 9. 图6-23 serviceInfoMap 变量的存储格式

serviceInfoMap 变量中肯定有不止一个服务的数据,这里以 newbee-cloud-goods-service 服务为例来讲解。在 Nacos Server 中有三个以 “newbee-cloud-goods-service” 命名的实例,请求并存放到 serviceInfoMap 变量中的数据结构如图 6-23 所示。在 serviceInfoMap 中有一个 key 为 “xxx_xxx@@newbee-cloud-goods-service” 的 ServiceInfo 对象。ServiceInfo 对象中有一个 hosts 变量,类型为 Listhosts 变量中有所有的实例信息,包括 192.168.1.101:9009、192.168.1.102:9009、192.168.1.103:9009 这三个服务实例的信息。

在前文的源码分析中,getServiceInfo() 的实现方式是判断 serviceInfoMap 是否存在某个服务,如果不存在,就请求 Nacos Server 拉取这个服务的所有实例信息;如果存在,就不会再次请求 Nacos Server,而是直接返回 serviceInfoMap 中的数据。于是问题就来了:如果 Nacos Server 中的服务实例数量增加或减少了,怎样保证 serviceInfoMap 中存放的数据的准确性呢?

比如,192.168.1.101:9009 上的 newbee-cloud-goods-service 服务关闭了,或者新注册了一个 192.168.1.104:9009 上的 newbee-cloud-goods-service 服务,Nacos Server 上肯定是最新的、最准确的数据,而 serviceInfoMap 变量中的数据还是 192.168.1.101:9009192.168.1.102:9009192.168.1.103:9009 这三个服务实例的信息。实例数据不准确怎么办?或者说 Nacos 的服务发现机制是如何完善的?

接下来结合源码介绍 serviceInfoMap 中实例信息的更新机制。

在分析 com.alibaba.nacos.client.naming.core.HostReactor 类的 getServiceInfo() 方法时,有一行代码笔者并没有详细介绍。这行代码里就包含了实例信息的更新机制,代码如下:

public ServiceInfo getServiceInfo(String serviceName, String clusters) {
    // 省略部分代码
    // 开启一个线程,定时更新 serviceInfoMap 中的实例数据
    scheduleUpdateIfAbsent(serviceName, clusters);
    // 从 serviceInfoMap 中获取该实例信息并返回
    return (ServiceInfo)this.serviceInfoMap.get(serviceObj.getKey());
}

其中,scheduleUpdateIfAbsent() 方法就是用来更新 serviceInfoMap 中的实例数据的,继续跟入源码,代码及注释如下:

public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
    if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
        return;
    }

    synchronized (futureMap) {
        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
            return;
        }

        // 开启一个延时任务 UpdateTask,更新某个服务的实例信息
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
        futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
    }

    public synchronized ScheduledFuture<?> addTask(UpdateTask task) {
        return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
    }
}

UpdateTaskHostReactor 类的内部类,实现了 Runnable 接口。在这里,延时任务执行后,会执行 UpdateTaskrun() 方法,源码及注释如下:

public class UpdateTask implements Runnable {

    // 省略部分代码

    @Override
    public void run() {
        long delayTime = DEFAULT_DELAY;
        try {
            // 根据 serviceName 获取 serviceInfoMap 变量中对应服务的实例数据
            ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));

            if (serviceObj == null) {
                // 如果为空,则直接向 Nacos Server 请求实例信息,并存放到 serviceInfoMap 中的 updateService(serviceName, clusters);
                return;
            }

            if (serviceObj.getLastRefTime() <= lastRefTime) {
                // 过期服务 (服务的最新更新时间小于等于 lastRefTime),从注册中心重新查询
                updateService(serviceName, clusters);
                serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
            } else {
                // 未过期,不对 serviceMapInfo 中的数据做额外操作
                refreshOnly(serviceName, clusters);

                // 刷新 lastRefTime
                lastRefTime = serviceObj.getLastRefTime();

                if (!notifier.isSubscribed(serviceName, clusters) && !futureMap
                        .containsKey(ServiceInfo.getKey(serviceName, clusters))) {
                    NAMING_LOGGER.info("[NA] update task is stopped, service:" +
                            serviceName + ", clusters:" + clusters);
                    return;
                }

                // 如果没有可用的实例信息,则统计失败次数,次数加 1
                if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
                    incFailCount();
                    return;
                }

                // 延迟任务的下次执行时间
                delayTime = serviceObj.getCacheMillis();

                // 失败次数清零
                resetFailCount();
            } catch(Throwable e){
                incFailCount();
                NAMING_LOGGER.warn("[NA] failed to update serviceName:" + serviceName, e);
            } finally{
                // 开始循环,继续开启延迟任务,异步执行 UpdateTask 线程,更新某个服务实例信息
                // 下次执行的时间与 failCount 有关。若 failCount=0,则下次调度时间为 1 秒,
                // 最长间隔为 1 分钟
                executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
            }
        }
    }
}

serviceInfoMap 变量中新增一条服务实例数据的同时,会开启一个异步线程来定时更新这个服务的实例信息。在 UpdateTask 线程的 run() 方法的 finally 代码块中,会再次开启一个异步线程并再次执行 UpdateTask 线程的 run() 方法,开启 “套娃模式” 来定时更新数据。这就是服务发现机制中实例信息更新流程的一种方式,每间隔一段时间,就会向 Nacos Server 直接发起请求并将最新的实例信息更新到 serviceInfoMap 变量中。这种方式是服务实例主动向 Nacos Server 发起请求获取实例信息,属于主动查询的方式。

除此之外,还有一种 Nacos Server 主动向应用进程推送的更新方式。这部分源码在 com.alibaba.nacos.client.naming.core.PushReceiver 类中,PushReceiver 类实现了 Runnable 接口,其 run() 方法的源码及注释如下:

public void run() {
    // closed默认为false,只要当前应用不关闭,就会一直循环执行下面的代码
    while (!closed) {
        try {
            byte[] buffer = new byte[UDP_MSS];
            DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
            // 接收 Nacos Server 推送的数据
            udpSocket.receive(packet);

            // 解析数据
            String json = new String(IOUtils.tryDecompress(packet.getData()),
                    StandardCharsets.UTF_8).trim();
            NAMING_LOGGER.info("received push data: " + json + " from " +
                    packet.getAddress().toString());

            // 类型转换
            PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
            String ack;

            // 当 pushPacket.type 为 dom 或 service 时,调用 processServiceJson
            // 方法更新 ServiceInfoMap 中的数据
            if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                hostReactor.processServiceJson(pushPacket.data);

                // send ack to server
                ack = "{\"type\":\"push-ack\",\"lastRefTime\":\"" +
                        pushPacket.lastRefTime + "\",\"data\":\"\"}";
                sendAck(ack, packet.getSocketAddress());
            } else if ("dump".equals(pushPacket.type)) {
                // 当 pushPacket.type 为 dump 时,将本地的数据发送给 Nacos Server
                // dump data to server
                ack = "{\"type\":\"dump-ack\",\"lastRefTime\":\"" +
                        pushPacket.lastRefTime + "\",\"data\":\"" +
                        StringUtil.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap())) +
                        "\"}";
                sendAck(ack, packet.getSocketAddress());
            } else {
                // do nothing send ack only
                ack = "{\"type\":\"unknown-ack\",\"lastRefTime\":\"" + pushPacket.lastRefTime
                        + "\",\"data\":\"" + "\"}";
            }

            // 向 Nacos Server 发送响应
            udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8),
                    ack.getBytes(UTF_8).length,
                    packet.getSocketAddress()));
        } catch (Exception e) {
            if (closed) {
                return;
            }
            NAMING_LOGGER.error("[NA] error while receiving push data", e);
        }
    }
}

以上就是服务发现机制中实例信息的更新流程,既有主动请求 Nacos Server 的方式,也有被动地接收 Nacos Server 数据推送的方式。这两种方式是同时存在的,共同保障 serviceInfoMap 中数据的准确性。

笔者整理了一张服务发现机制的简图,可以帮助读者更好地理解这部分知识,如图 6-24 所示。

image 2025 04 16 13 35 41 209
Figure 10. 图6-24 服务与 Nacos 之间服务发现机制的简图

通过实际的编码和源码讲解,读者应该能体会到一件事:使用 Spring Cloud Alibaba 套件,服务注册和服务发现都比较简单,编码也很方便、简洁。开发人员对很多功能是无感知的。通过源码分析才发现,原来代码底层做了如此多的工作。建议读者打上断点,按照本节中提到的流程、具体的实现类和方法,多运行几遍流程,多看几次源码,这样才能更好地理解服务注册与服务发现的底层原理。