使用RabbitMQ和AMQP
RabbitMQ可以说是AMQP最杰出的实现,它提供了比JMS更高级的消息路由策略。JMS消息需要使用目的地名称来寻址,接收者会从这里检索消息,而AMQP消息使用交换机(exchange)和路由键(routing key)来寻址,这样可以使消息与接收者要监听的队列解耦。交换机和队列的关系如图9.2所示。

消息抵达RabbitMQ代理时,会进入为其设置的交换机上。交换机负责将它路由到一个或多个队列中,这个过程会基于交换机的类型、交换机与队列间的绑定,以及消息的路由键进行。
这方面有多个不同类型的交换机,具体如下。
-
Default:这是代理创建的特殊交换机。它会将消息路由至名字与消息的路由键相同的队列。所有的队列都会自动绑定至Default类型的交换机。
-
Direct:如果消息的路由键与队列的绑定键相同,消息会路由到该队列上。
-
Topic:如果消息的路由键与队列的绑定键(可能会包含通配符)匹配,消息会路由到一个或多个这样的队列上。
-
Fanout:不管路由键和绑定键是什么,消息都会路由到所有绑定队列上。
-
Headers:与Topic类型类似,只不过要基于消息的头信息进行路由,而不是路由键。
-
Dead letter:捕获所有无法投递(也就是它们无法匹配所有已定义的交换机和队列的绑定关系)的消息。
最简单的交换机形式是Default和Fanout,因为它们大致对应了JMS中的队列和主题。但是其他的交换机允许我们定义更加灵活的路由模式。
这里最重要的是要明白消息会通过路由键发送至交换机,而消息要在队列中被消费。它们如何从交换机路由至队列取决于绑定的定义,以及哪种方式最适合我们的使用场景。
至于使用哪种交换机类型,以及如何定义从Exchange到队列的绑定,这本身与如何在Spring应用中发送和接收消息关系不大。因此,我们更关心如何编写使用Rabbit发送和接收消息的代码。
注意:关于如何绑定队列到交换机的更详细讨论,请参考Gavin Roy编写的RabbitMQ in Depth(Manning,2017年)或者Alvaro Videla和Jason J.W.Williams合著的RabbitMQ in Action(Manning,2012年)。 |
添加RabbitMQ到Spring中
在使用Spring发送和接收RabbitMQ消息之前,我们需要将Spring Boot的AMQP starter依赖添加到构建文件中,替换上文中的Artemis或ActiveMQ starter:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
添加AMQP starter到构建文件中将会触发自动配置功能,这样会为我们创建一个AMQP连接工厂、RabbitTemplate bean及其他支撑组件。要使用Spring发送和接收RabbitMQ代理的消息,只需要添加这项依赖。但是,还有一些有用的属性需要我们掌握,如表9.4所示。

对于开发来说,我们可能会使用不需要认证的RabbitMQ代理,它运行在本地机器上并监听5672端口。在开发阶段,这些属性可能没有太大的用处,但是当应用程序投入生产环境时,它们无疑是非常有用的。
运行RabbitMQ代理
如果没有可供使用的RabbitMQ代理,有多种在本地机器上运行RabbitMQ的方案。请参阅官方的RabbitMQ文档以了解运行RabbitMQ的最新指南。
例如,假设我们要将应用投入生产环境,RabbitMQ代理位于名为rabbit.tacocloud.com服务器上,监听5673端口并且需要认证。在这种情况下,当prod profile处于激活状态时,application.yml文件中的如下配置将会设置这些属性:
spring:
profiles: prod
rabbitmq:
host: rabbit.tacocloud.com
port: 5673
username: tacoweb
password: l3tm31n
在我们的应用中,RabbitMQ已经配置好了,接下来就可以使用RabbitTemplate发送消息了。
通过RabbitTemplate发送消息
Spring对RabbitMQ消息支持的核心是RabbitTemplate。RabbitTemplate与JmsTemplate类似,提供了一组相似的方法。但是,我们会看到,这里有一些细微的差异。这与RabbitMQ独特的运行方式有关。
在使用RabbitTemplate发送消息方面,可以使用与JmsTemplate中同名的send()和convertAndSend()方法。但是,与JmsTemplate的方法只能将消息路由至队列或主题不同,RabbitTemplate会按照交换机和绑定键来发送消息。如下列出了关于使用RabbitTemplate发送消息最重要的一些方法:
// 发送原始的消息
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message)
throws AmqpException;
// 发送根据对象转换而成的消息
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message)
throws AmqpException;
void convertAndSend(String exchange, String routingKey,
Object message) throws AmqpException;
// 发送根据对象转换而成的消息,且带有后期处理的功能
void convertAndSend(Object message, MessagePostProcessor mPP)
throws AmqpException;
void convertAndSend(String routingKey, Object message,
MessagePostProcessor messagePostProcessor)
throws AmqpException;
void convertAndSend(String exchange, String routingKey,
Object message,
MessagePostProcessor messagePostProcessor)
throws AmqpException;
我们可以看到,这些方法与JmsTemplate中对应的方法遵循了相同的模式。开始的3个send()方法都是发送原始的Message对象。接下来的3个convertAndSend()方法会接受一个对象,这个对象会在发送之前在幕后转换成Message。最后的3个convertAndSend()方法与之类似,但还会接受一个MessagePostProcessor对象,这个对象能够在Message发送至代理之前对其进行操作。
这些方法与JmsTemplate对应方法的不同之处在于,它们会接受String类型的值以指定交换机和路由键,而不像JmsTemplate那样接受目的地名称(或Destination)。没有接受交换机参数的方法会将消息发送至Default交换机。与之类似,如果没有指定路由键的方法,消息会被路由至默认的路由键。
接下来,我们看一下如何使用RabbitTemplate发送taco订单。有种方式是使用send()方法,如程序清单9.5所示。但是,在调用send()之前,需要将TacoOrder对象转换为Message。RabbitTemplate能够通过getMessageConverter()方法获取其消息转换器,否则,这项工作会非常无聊。
package tacos.messaging;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import
org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import tacos.Order;
@Service
public class RabbitOrderMessagingService
implements OrderMessagingService {
private RabbitTemplate rabbit;
@Autowired
public RabbitOrderMessagingService(RabbitTemplate rabbit) {
this.rabbit = rabbit;
}
public void sendOrder(TacoOrder order) {
MessageConverter converter = rabbit.getMessageConverter();
MessageProperties props = new MessageProperties();
Message message = converter.toMessage(order, props);
rabbit.send("tacocloud.order", message);
}
}
你可能已经注意到,RabbitOrderMessagingService实现了OrderMessagingService接口,这与JmsOrderMessagingService类似。这意味着我们可以按照相同的方式将它注入OrderApiController,在提交订单时发送订单消息。我们目前还无法接收这些消息,但是可以使用基于浏览器的RabbitMQ管理控制台。
有了MessageConverter之后,将TacoOrder转换成Message就是非常简单的任务了。我们必须要通过MessageProperties来提供消息属性,但是如果我们不需要设置任何这样的属性,可以使用默认的MessageProperties实例。剩下的就是调用send(),并将交换机和路由键(这两者都是可选的)连同消息一起传递过去。在本例中,我们只指定了路由键(即tacocloud.order)和消息本身,这样一来,就会使用默认的交换机。
至于默认的交换机,它的名字是""(也就是空字符串),这对应了RabbitMQ代理自动生成的Default交换机。与之相似,默认的路由键也是""(它的路由将会取决于交换机以及相应的绑定)。我们可以通过设置spring.rabbitmq.template.exchange和spring.rabbitmq.template.routing-key属性重写这些默认值:
spring:
rabbitmq:
template:
exchange: tacocloud.order
routing-key: kitchens.central
在本例中,所有的未指明交换机的消息会自动发送至名为tacocloud.order的交换机。如果调用send()或convertAndSend()时也没有指定路由键,消息将会使用值为kitchens.central的路由键。
通过消息转换器创建Message对象是非常简单的,但是使用convertAndSend()让RabbitTemplate处理所有的转换操作会更加简单:
public void sendOrder(TacoOrder order) {
rabbit.convertAndSend("tacocloud.order", order);
}
配置消息转换器
默认情况下,消息转换是通过SimpleMessageConverter来实现的,它能够将简单类型(如String)和Serializable对象转换成Message对象。但是,Spring为RabbitTemplate提供了多个消息转换器,如下所示。
-
Jackson2JsonMessageConverter:使用Jackson 2 JSON处理器实现对象和JSON的相互转换。
-
MarshallingMessageConverter:使用Spring的Marshaller和Unmarshaller进行转换。
-
SerializerMessageConverter:使用Spring的Serializer和Deserializer转换String和任意种类的原生对象。
-
SimpleMessageConverter:转换String、字节数组和Serializable类型。
-
ContentTypeDelegatingMessageConverter:基于contentType头信息,将转换功能委托给另外一个MessageConverter。
-
MessagingMessageConverter:将消息转换功能委托给另外一个MessageConverter,并将头信息的转换委托给AmqpHeaderConverter。
如果需要变更消息转换器,可以配置一个类型为MessageConverter的bean。例如,对于基于JSON的转换,我们可以按照如下的方式配置Jackson2JsonMessageConverter:
@Bean
public Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
Spring Boot的自动配置功能会发现这个bean,并将它注入RabbitTemplate,替换默认的消息转换器。
设置消息属性
与在JMS中一样,我们可能需要在发送的消息中添加一些头信息,例如为所有通过Taco Cloud Web站点提交的订单添加一个X_ORDER_SOURCE信息。我们自行创建Message时,可以通过MessageProperties实例设置头信息,随后将这个对象传递给消息转换器。回到程序清单9.5的sendOrder()方法,我们需要做的就是添加设置头信息的代码:
public void sendOrder(TacoOrder order) {
MessageConverter converter = rabbit.getMessageConverter();
MessageProperties props = new MessageProperties();
props.setHeader("X_ORDER_SOURCE", "WEB");
Message message = converter.toMessage(order, props);
rabbit.send("tacocloud.order", message);
}
但是,使用convertAndSend()时,我们无法快速访问MessageProperties对象。不过,此时MessagePostProcessor可以帮助我们:
public void sendOrder(TacoOrder order) {
rabbit.convertAndSend("tacocloud.order.queue", order,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message)
throws AmqpException {
MessageProperties props = message.getMessageProperties();
props.setHeader("X_ORDER_SOURCE", "WEB");
return message;
}
});
}
在这里,我们为convertAndSend()提供了一个实现MessagePostProcessor接口的匿名内部类。在postProcessMessage()中,我们从Message中拉取MessageProperties对象,然后通过setHeader()方法设置X_ORDER_SOURCE头信息。
我们已经看到了如何通过RabbitTemplate发送消息,接下来我们转换视角看一下如何接收来自RabbitMQ队列的消息。
接收来自RabbitMQ的消息
使用RabbitTemplate发送消息与使用JmsTemplate发送消息并没有太大差别。实际上,接收来自RabbitMQ队列的消息也与接收来自JMS的消息没有很大差别。
与JMS类似,我们有两个可选方案:
-
使用RabbitTemplate从队列拉取消息;
-
将消息推送至带有@RabbitListener注解的方法中。
我们首先看一下基于拉取的RabbitTemplate.receive()方法。
使用RabbitTemplate接收消息
RabbitTemplate提供了多个从队列拉取消息的方法。其中,较为有用的方法如下所示:
// 接收消息
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
// 接收由消息转换而成的对象
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis)
throws AmqpException;
// 接收由消息转换而成且类型安全的对象
<T> T receiveAndConvert(ParameterizedTypeReference<T> type)
throws AmqpException;
<T> T receiveAndConvert(
String queueName, ParameterizedTypeReference<T> type)
throws AmqpException;
<T> T receiveAndConvert(
long timeoutMillis, ParameterizedTypeReference<T> type)
throws AmqpException;
<T> T receiveAndConvert(String queueName, long timeoutMillis,
ParameterizedTypeReference<T> type)
throws AmqpException;
这些方法对应于前文所述的send()和convertAndSend()方法。send()用于发送原始的Message对象,而receive()则会接收来自队列的原始Message对象。与之类似,receiveAndConvert()接收消息,并在返回之前使用一个消息转换器将它们转换为领域对象。
但是,这些方法在方法签名上体现出明显的不同。首先,这些方法都不会接收交换机和路由键作为参数。这是因为交换机和路由键是用来将消息路由至队列的,一旦消息进入队列,它们的目的地就是将它们从队列中拉取下来的消费者。消费消息的应用本身并不需要关心交换机和路由键,只需要知道队列信息。
你可能会注意到,很多方法都接收一个long类型的参数,用来指定接收消息的超时时间。默认情况下,接收消息的超时时间是0毫秒。也就是说,调用receive()会立即返回。如果没有可用消息,返回值是null。这是与JmsTemplate的receive()的一个显著差异。通过传入一个超时时间的值,我们就可以让receive()和receiveAndConvert()阻塞,直到消息抵达或者超时。但是,即便设置了非零的超时时间,代码中依然要处理null返回值的场景。
接下来看一下如何实际使用它们。程序清单9.6展现了一个新的基于Rabbit的OrderReceiver实现,它使用RabbitTemplate来接收订单。
package tacos.kitchen.messaging.rabbit;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RabbitOrderReceiver {
private RabbitTemplate rabbit;
private MessageConverter converter;
@Autowired
public RabbitOrderReceiver(RabbitTemplate rabbit) {
this.rabbit = rabbit;
this.converter = rabbit.getMessageConverter();
}
public TacoOrder receiveOrder() {
Message message = rabbit.receive("tacocloud.order");
return message != null
? (TacoOrder) converter.fromMessage(message)
: null;
}
}
所有的操作都发生在receiveOrder()方法中。它调用了被注入的RabbitTemplate对象的receive()方法,从名为tacocloud.order的队列中拉取一个订单。它并没有提供超时时间,所以我们只能假定这个调用会马上返回,要么会得到Message对象,要么返回null。如果返回Message对象,我们使用RabbitTemplate中的MessageConverter将Message转换成一个TacoOrder对象;如果receive()方法返回null,我们就将null作为返回值。
根据使用场景,我们也许能够容忍一定的延迟。例如,在Taco Cloud厨房悬挂的显示器中,如果没有订单,我们可以稍等一会儿。假设我们决定等待30秒再放弃。那么可以修改receiveOrder()方法,传递30000毫秒的延迟给receive()方法:
public TacoOrder receiveOrder() {
Message message = rabbit.receive("tacocloud.order.queue", 30000);
return message != null
? (TacoOrder) converter.fromMessage(message)
: null;
}
如果你觉得使用这样一个硬编码的数字会让人觉得不舒服,认为更好的方案是创建一个带有@ConfigurationProperties注解的类,并使用Spring Boot的配置属性来设置超时时间,那么在这一点上,我的想法和你一样,只不过Spring Boot已经为我们提供了一个这样的配置属性。想要通过配置来设置超时时间,只需要在调用receive()时移除超时值,并将超时时间设置为spring.rabbitmq.template.receive-timeout属性:
spring:
rabbitmq:
template:
receive-timeout: 30000
回到receiveOrder()方法。我们必须要使用RabbitTemplate中的消息转换器,才能将传入的Message对象转换成TacoOrder对象。但是,RabbitTemplate既然已经携带了消息转换器,为什么不能自动为我们转换呢?这就是receiveAndConvert()方法所做的事情。借助receiveAndConvert(),可以将receiveOrder()重写为:
public TacoOrder receiveOrder() {
return (TacoOrder) rabbit.receiveAndConvert("tacocloud.order.queue");
}
看起来简单了许多,对吧?唯一让我觉得麻烦的就是从Object到TacoOrder的类型转换。不过,这种转换还有另一种实现方式:我们可以传递一个ParameterizedTypeReference引用给receiveAndConvert(),这样就可以直接得到TacoOrder对象了:
public TacoOrder receiveOrder() {
return rabbit.receiveAndConvert("tacocloud.order.queue",
new ParameterizedTypeReference<Order>() {});
}
关于这种方式是否真的比类型转换更好,依然还有争论,但是它确实能够更加确保类型安全。唯一需要注意的是,要在receiveAndConvert()中使用ParameterizedTypeReference,消息转换器必须要实现SmartMessageConverter,目前Jackson2JsonMessageConverter是唯一可选的内置实现。
RabbitTemplate提供的拉取模式适用于很多使用场景,但在另一些场景中,监听消息并在消息抵达时对其进行处理会更好一些。接下来,我们看一下如何编写消息驱动的bean,让它对RabbitMQ消息做出回应。
使用监听器处理RabbitMQ的消息
Spring提供了RabbitListener实现消息驱动的RabbitMQ bean,它对应于JMS的JmsListener。为了声明消息抵达RabbitMQ队列时应该调用某个方法,可以为bean的方法添加@RabbitListener注解。
例如,程序清单9.7展现了OrderReceiver的RabbitMQ实现,它通过注解声明要监听订单消息,而不是使用RabbitTemplate进行轮询。
package tacos.kitchen.messaging.rabbit.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import tacos.TacoOrder;
import tacos.kitchen.KitchenUI;
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@RabbitListener(queues = "tacocloud.order.queue")
public void receiveOrder(TacoOrder order) {
ui.displayOrder(order);
}
}
你肯定会发现,程序清单9.7与程序清单9.4的代码非常相似。确实如此:从程序清单9.4到程序清单9.7,唯一的变更就是监听器的注解从@JmsListener变成了@RabbitListener。尽管@RabbitListener注解非常棒,但是几乎重复的代码无法体现@RabbitListener具有哪些在@JmsListener中还没有提到的功能。当消息从各自的代理推送过来时,我们可以分别使用这两个注解编写对应的处理逻辑,其中@JmsListener对应的是JMS代理,而@RabbitListener对应的是RabbitMQ代理。
你可能曾对@RabbitListener感到兴味索然,但是这并非我的本意。实际上,@RabbitListener和@JmsListener的运行方式非常相似,这意味着我们使用RabbitMQ替代Artemis或ActiveMQ的时候,不需要学习全新的编程模型。同样令人兴奋的是,RabbitTemplate和JmsTemplate之间也具有这样的相似性。
让我们暂且保持一下这种兴奋,在本章结束之前,我们看一下Spring支持的另一个消息方案:Apache Kafka。