Spring Cloud整合RocketMQ
代码清单11-14 TransactionListener监听器实现示例
代码清单11-16 POM文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.rocketmq</groupId>
<artifactId>test-rocketmq</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>test-rocketmq</name>
<url>http://maven.apache.org</url>
<description>RocketMQ</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java_source_version>1.8</java_source_version>
<java_target_version>1.8</java_target_version>
<file_encoding>UTF-8</file_encoding>
<springboot.version>2.1.3.RELEASE</springboot.version>
<rocketmq.version>4.6.0</rocketmq.version>
</properties>
<dependencies>
<!-- SpringBoot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${springboot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${springboot.version}</version>
<scope>test</scope>
</dependency>
<!-- RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<fork>true</fork>
<source>${java_source_version}</source>
<target>${java_target_version}</target>
<encoding>${file_encoding}</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
第二步:Spring Boot启动类非常简单,只要加上@Configuration和@EnableAuto-Configuration注解即可,如代码清单11-17所示。
代码清单11-17 Spring Boot启动类
package com.rocketmq.test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableAutoConfiguration
@ComponentScan(basePackages ={" com.rocketmq.test "})
public class RocketMQApplicationMain {
public static void main(String[] args) {
SpringApplication.run(RocketMQApplicationMain.class, args);
}
}
Spring Boot 启动还需要 application 的配置文件,配置的内容如下。
spring:
rocketmq:
namesrvaddr: localhost:29876
producerGroup: TestProducer
consumerGroup: TestConsumer
第三步:生产者和消费者的代码,如代码清单11-18、代码清单11-19所示。
代码清单11-18 生产者代码
package com.rocketmq.test;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
/**
* RocketMQ生产者
*/
@Component
public class MQProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(MQProducer.class);
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;
private final DefaultMQProducer producer = new DefaultMQProducer("TestRocketMQProducer");
/**
* 初始化
*/
@PostConstruct
public void start() {
try {
LOGGER.info("MQ:启动生产者");
producer.setNamesrvAddr(namesrvAddr);
producer.start();
} catch (MQClientException e) {
LOGGER.error("MQ:启动生产者失败:{}-{}", e.getResponseCode(), e.getErrorMessage());
throw new RuntimeException(e.getMessage(), e);
}
}
/**
* 发送消息
*
* @param data 消息内容
* @param topic 主题
* @param tags 标签
* @param keys 唯一主键
*/
public void sendMessage(String data, String topic, String tags, String keys) {
try {
byte[] messageBody = data.getBytes(RemotingHelper.DEFAULT_CHARSET);
Message mqMsg = new Message(topic, tags, keys, messageBody);
producer.send(mqMsg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
LOGGER.info("MQ: 生产者发送消息 {}", sendResult);
}
@Override
public void onException(Throwable throwable) {
LOGGER.error(throwable.getMessage(), throwable);
}
});
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
/**
* 关闭生产者
*/
@PreDestroy
public void stop() {
if (producer != null) {
producer.shutdown();
LOGGER.info("MQ:关闭生产者");
}
}
}
代码清单11-19 消费者代码
package com.rocketmq.test;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;
/**
* RocketMQ消费者
*/
@Component
public class MQPushConsumer implements MessageListenerConcurrently {
private static final Logger LOGGER = LoggerFactory.getLogger(MQPushConsumer.class);
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;
private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestRocketMQPushConsumer");
/**
* 初始化
*
* @throws MQClientException
*/
@PostConstruct
public void start() {
try {
LOGGER.info("MQ:启动消费者");
consumer.setNamesrvAddr(namesrvAddr);
// 从消息队列头开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 集群消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);
// 订阅主题
consumer.subscribe("TopicTest", "*");
// 注册消息监听器
consumer.registerMessageListener(this);
// 启动消费端
consumer.start();
} catch (MQClientException e) {
LOGGER.error("MQ:启动消费者失败:{}-{}", e.getResponseCode(), e.getErrorMessage());
throw new RuntimeException(e.getMessage(), e);
}
}
/**
* 消费消息
*
* @param msgs 消息列表
* @param context 消费上下文
* @return 消费状态
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
int index = 0;
try {
for (; index < msgs.size(); index++) {
MessageExt msg = msgs.get(index);
String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
LOGGER.info("MQ:消费者接收新信息: {} {} {} {} {}", msg.getMsgId(), msg.getTopic(), msg.getTags(), msg.getKeys(), messageBody);
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
} finally {
if (index < msgs.size()) {
context.setAckIndex(index + 1);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
/**
* 停止消费者
*/
@PreDestroy
public void stop() {
if (consumer != null) {
consumer.shutdown();
LOGGER.info("MQ:关闭消费者");
}
}
}
第四步:测试代码如代码清单11-20所示。
代码清单11-20 测试代码
package com.rocketmq.test;
import com.xusg.study.producer.MQProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {RocketMQApplicationMain.class})
public class TestRocketMQ {
@Resource
private MQProducer mqProducer;
@Test
public void testProducer() {
for (int i = 0; i < 10; i++) {
mqProducer.sendMessage("Hello RocketMQ " + i, "TopicTest", "TagTest", "Key" + i);
}
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
第五步:测试结果如代码清单11-21所示。
代码清单11-21 测试结果