使用Kafka的消息

Apache Kafka是我们在本章研究的最新的消息方案。乍看上去,Kafka是与ActiveMQ、Artemis、Rabbit类似的消息代理,但是,Kafka有一些独特的技巧。

按照设计,Kafka是集群运行的,能够实现很强的可扩展性。通过将主题在集群的所有实例上进行分区(partition),它具有更强的弹性。RabbitMQ主要处理交换机中的队列,而Kafka使用主题实现消息的发布和订阅。

Kafka主题会被复制到集群的所有代理上。集群中的每个节点会担任一个或多个主题的首领(leader),负责该主题的数据并将其复制到集群中的其他节点上。

更进一步来讲,每个主题可以划分为多个分区。在这种情况下,集群中的每个节点是某个主题一个或多个分区的首领,但并不是整个主题的首领。主题的责任会拆分到所有节点。图9.3阐述了它的运行方式。

image 2024 03 13 23 41 06 095
Figure 1. 图9.3 Kafka集群由多个代理组成,每个代理作为主题分区的首领

关于Kafka的独特架构,我建议你阅读Dylan Scott编写的Kafka in Action(Manning,2017年)。在本节,我们会关注如何通过Spring发送和接收Kafka的消息。

为Spring搭建支持Kafka消息的环境

为了搭建Kafka的消息环境,需要在构建文件中添加对应的依赖。但是,与JMS和RabbitMQ方案不同,并没有针对Kafka的Spring Boot starter。不过,不用担心,我们只需要添加一项依赖:

<dependency>
     <groupId>org.springframework.kafka</groupId>
     <artifactId>spring-kafka</artifactId>
</dependency>

这项依赖会为我们的项目引入Kafka所需的内容。另外,它的出现会触发Spring Boot对Kafka的自动配置,还会在Spring应用上下文中创建一个KafkaTemplate。我们所需要做的就是注入KafkaTemplate并使用它来发布和接收消息。

但是,在发送和接收消息之前,我们还需要注意Kafka的一些特性。具体来讲,KafkaTemplate默认会使用localhost上监听9092端口的Kafka代理。开发应用时,在本地启动Kafka代理没有什么问题,但是在投入生产时,需要配置不同的主机和端口。

安装Kafka集群

想要运行本章代码,需要拥有一个Kafka集群。借助Kafka文档,我们能够很好地掌握如何在本地运行Kafka。

spring.kafka.bootstrap-servers属性能够设置一个或多个Kafka服务器的地址,系统将会使用它来建立到Kafka集群的初始连接。例如,如果集群中有某个服务器运行在kafka.tacocloud.com上并监听9092端口,我们可以按照如下的方式在YAML中配置它的位置:

spring:
  kafka:
    bootstrap-servers:
    - kafka.tacocloud.com:9092

但是需要注意,spring.kafka.bootstrap-servers是复数形式,能接受列表。所以,我们可以提供集群中的多个Kafka服务器:

spring:
  kafka:
    bootstrap-servers:
    - kafka.tacocloud.com:9092
    - kafka.tacocloud.com:9093
    - kafka.tacocloud.com:9094

这些配置适用于名为kafka.tacocloud.com的主机上的Kafka服务器。若想在本地运行Kafka(通常会在开发期这样做),需要将其配置为localhost,如下所示:

spring:
  kafka:
    bootstrap-servers:
    - localhost:9092

Kafka在项目中准备就绪之后,我们就可以发送和接收消息了。我们首先使用KafkaTemplate发送TacoOrder对象到Kafka中。

通过KafkaTemplate发送消息

KafkaTemplate在很多方面都与JMS和RabbitMQ对应的模板非常相似,但也有一些差异。在发送消息的时候,这一点非常明显:

ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic,
                                  Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic,
                  Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);

ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition,
                                               K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition,
                                     Long timestamp, K key, V data);

我们首先可能会发现,这里没有convertAndSend()方法。这是因为,KafkaTemplate是通过泛型类型化的,在发送消息的时候,它能够直接处理领域类型。这样一来,所有的send()方法都完成了convertAndSend()的任务。

你可能也会注意到,send()和sendDefault()的参数与JMS和Rabbit有很大的差异。使用Kafka发送消息时,可以使用如下参数设置消息该如何发送:

  • 消息要发送到的主题(send()方法的必选参数);

  • 主题要写入的分区(可选);

  • 记录上要发送的key(可选);

  • 时间戳(可选,默认为System.currentTimeMillis());

  • 载荷(必选)。

主题和载荷是其中最重要的两个参数。分区和key对于如何使用KafkaTemplate几乎没有影响,只是作为额外的信息提供给send()和sendDefault()。对于我们的场景,我们只关心将消息载荷发送到给定的主题,不用担心分区和key的问题。

对于send()方法,我们还可以选择发送一个ProducerRecord对象。这是一个简单类型,将上述的参数放到了一个对象中。我们还可以发送Message对象,但这需要我们将领域对象转换成Message对象。相比创建和发送ProducerRecord和Message对象,使用其他方法会更简单一些。

借助KafkaTemplate及其send()方法,我们可以编写一个基于Kafka实现的OrderMessagingService实现。程序清单9.8展现了该实现类。

package tacos.messaging;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import tacos.TacoOrder;

@Service
public class KafkaOrderMessagingService
                                  implements OrderMessagingService {

  private KafkaTemplate<String, TacoOrder> kafkaTemplate;

  @Autowired
  public KafkaOrderMessagingService(
          KafkaTemplate<String, TacoOrder> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
  }

  @Override
  public void sendOrder(TacoOrder order) {
    kafkaTemplate.send("tacocloud.orders.topic", order);
  }

}

在这个OrderMessagingService的新实现中,sendOrder()使用被注入的KafkaTemplate对象的send()方法,将TacoOrder发送到名为tacocloud.orders.topic的主题中。除了代码中随处可见的“Kafka”之外,这其实与为JMS和Rabbit编写的代码并没有太大的差异。与OrderMessagingService其他的实现类似,它可以被注入OrderApiController,我们通过“/api/orders”端点提交订单时,就可以向Kafka发送订单了。

创建Kafka版本的消息接收者之前,我们可以使用一个控制台来查看发送的消息。有多个这样的Kafka管理控制台供我们选择,包括Offset Explorer和Confluent的Apache Kafka UI。

如果想要设置默认主题,可以稍微简化一下sendOrder()。首先,通过spring.kafka.template. default-topic属性,可以将默认主题设置为tacocloud.orders.topic:

spring:
  kafka:
    bootstrap-servers:
    - localhost:9092
    template:
      default-topic: tacocloud.orders.topic

然后,在sendOrder()方法中,就可以调用sendDefault()而不是send()了。这样不需要指定主题的名称:

@Override
public void sendOrder(TacoOrder order) {
  kafkaTemplate.sendDefault(order);
}

现在,我们已经编写完发送消息的代码了,接下来,我们转移一下注意力,编写从Kafka中接收消息的代码。

编写Kafka监听器

除了send()和sendDefault()特有的方法签名,KafkaTemplate与JmsTemplate(或RabbitTemplate)的另外一个不同之处在于前者没有提供接收消息的方法。这意味着在Spring中,想要消费来自Kafka主题的消息只有一种办法,那就是编写消息监听器。

对于Kafka消息,消息监听器是通过带有@KafkaListener注解的方法来实现的。@KafkaListener大致对应于@JmsListener和@RabbitListener,并且二者的使用方式也基本相同。程序清单9.9展示了为Kafka编写的基于监听器的订单接收器。

程序清单9.9 使用@KafkaListener接收订单
package tacos.kitchen.messaging.kafka.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import tacos.Order;
import tacos.kitchen.KitchenUI;

@Component
public class OrderListener {

  private KitchenUI ui;

  @Autowired
  public OrderListener(KitchenUI ui) {
    this.ui = ui;
  }

  @KafkaListener(topics = "tacocloud.orders.topic")
  public void handle(TacoOrder order) {
    ui.displayOrder(order);
  }

}

handle()方法使用了@KafkaListener注解,表明当有消息抵达名为tacocloud.orders.topic的主题时,该方法将会被调用。程序清单9.9中,我们只将TacoOrder(载荷)对象传递给了handle()方法。但是,如果想要获取消息中其他的元数据,我们也可以接受ConsumerRecord或Message对象。

例如,如下的handle()实现了接受ConsumerRecord,这样我们就能在日志中将消息的分区和时间戳记录下来:

@KafkaListener(topics = "tacocloud.orders.topic")
public void handle(
        TacoOrder order, ConsumerRecord<String, TacoOrder> record) {
  log.info("Received from partition {} with timestamp {}",
      record.partition(), record.timestamp());
  ui.displayOrder(order);
}

类似地,使用Message对象替代ConsumerRecord,也能够达到相同的目的:

@KafkaListener(topics = "tacocloud.orders.topic")
public void handle(Order order, Message<Order> message) {
  MessageHeaders headers = message.getHeaders();
  log.info("Received from partition {} with timestamp {}",
      headers.get(KafkaHeaders.RECEIVED_PARTITION_ID),
      headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
  ui.displayOrder(order);
}

值得一提的是,消息载荷也可以通过ConsumerRecord.value()或Message.getPayload()获取。这意味着我们可以通过这些对象获取TacoOrder,而不必直接将其作为handle()的参数。

小结

  • 异步消息在需要通信的应用程序之间提供了一个中间层,这样能够实现更松散的耦合和更强的可扩展性。

  • Spring支持使用JMS、RabbitMQ或Apache Kafka实现异步消息。

  • 应用程序可以使用基于模板的客户端(JmsTemplate、RabbitTemplate或KafkaTemplate)向消息代理发送消息。

  • 接收消息的应用程序可以借助基于模板的客户端拉取模式消费消息。

  • 借助消息监听器注解(@JmsListener、@RabbitListener或@KafkaListener),消息也可以推送至消费者的bean方法中。