事务消息
我们以电商下单场景为例模拟事务消息的使用方法。订单子系统创建订单,需要将订单数据下发到其他子系统(与第三方系统对接)。我们通常会将两个系统进行解耦,不会直接使用服务调用的方式进行交互,业务实现步骤如下。
1)系统创建订单并入库。 2)发送消息到MQ。 3)消费者消费消息,发送远程RPC服务调用,完成订单数据的同步。
方案一
方案一如代码清单11-11所示。
@SuppressWarnings("rawtypes")
public Map createOrder() {
Map result = new HashMap();
// 执行下单相关的业务流程,例如操作本地数据库落库
// 调用消息发送端API发送消息
// 返回结果,提交事务
return result;
}
方案一的弊端如下。
1)如果消息发送成功,但在提交事务的时候JVM突然挂掉,导致事务提交失败,那么两个系统之间的数据会不一致。
2)因为消息是在事务提交之前提交的,所以发送的消息内容是订单实体的内容,在消费端进行消费时,可能会出现订单不存在的情况。
方案二
由于存在上述问题,在 RocketMQ 不支持事务消息的前提条件下,可以采用代码清单11-12所示的方式进行优化。
@SuppressWarnings("rawtypes")
public Map createOrder() {
Map result = new HashMap();
// 执行下单相关的业务流程,例如操作本地数据库落库
// 生成事务消息唯一业务标识,将该业务标识组装到待发送的消息体中
// 向待发送消息表中插入一条记录,内容包括本次唯一消息发送业务ID、消息JSON{消息主题、消息tag、消息体}、创建时间、发送状态
// 将消息体返回到控制器层
// 返回结果,提交事务
return result;
}
在控制器层异步发送消息的同时需要引入定时机制,用于扫描消息发送记录,避免消息丢失。
方案二的弊端如下。
1)消息有可能重复发送,但在消费端可以通过唯一业务编号进行去重设计。 2)实现过于复杂,为了避免极端情况下丢失消息,需要使用定时任务。
方案三
方案三基于RocketMQ 4.3版的事务消息实现,如代码清单11-13所示。
import com.alibaba.fastjson.JSON;
import com.example.demo.entry.Order;
import com.example.demo.help.SpringContextUtils;
import com.example.demo.service.OrderService;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent .*;
@Servicepublic
class OrderServiceImpl implements OrderService {
private TransactionMQProducer mqProducer;
public OrderServiceImpl() {
mqProducer = new TransactionMQProducer("order_producer_grpup");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFacto ry() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("order - producer - grpup_msg - check - thread");
return thread;
}
});
mqProducer.setExecutorService(executorService); // 设置事务消息回调监听器
mqProducer.setTransactionListener(SpringContextUtils.getBean("orderMessageListener"));
try {
mqProducer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
@Override
public Map saveOrCreateOrder(Order order) {
Map result = new HashMap();
if (order.getBuyerId() == null || order.getBuyerId() < 1) {
result.put("code", "1");
result.put("msg", "用户购买者不能为空");
return result;
}
// 省略其他业务类校验
try {
mqProducer.send(new Message("topic_order", JSON.toJSONString(order).getBytes()));
} catch (Throwable e) {
e.printStackTrace();
//可以进行一些重试,在这里直接返回错误
result.put("code", "1");
result.put("msg", "系统异常");
return result;
}
result.put(" code", 0);
return result;
}
}
代码清单11-13主要是完成业务类规则的校验,然后发送一条消息到RocketMQ,业务的具体逻辑在事务消息回调函数中实现,具体在TransactionListener监听器中实现,如代码清单11-14所示。
import com.alibaba.fastjson.JSON;
import com.example.demo.entry.Order;
import com.example.demo.entry.OrderTransLog;
import com.example.demo.mapper.OrderMapper;
import com.example.demo.mapper.OrderTransLogMapper;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
@Servicepublic
class OrderMessageListener implements TransactionListener {
@Autowired
private OrderMapper orderMapper;
@Autowired
private OrderTransLogMapper orderTransLogMapper;
/**
* 温馨提示:该方法需要被包含在事务中
*
* @param msg
* @param arg
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Order order = JSON.parseObject(new String(msg.getBody()), Order.class);
// 进行一系列业务处理
orderMapper.insertOrder(order);
OrderTransLog log = new OrderTransLog();
log.setUnionCode(order.getOrderNo());
log.setCreateDate(new Date(System.currentTimeMillis()));
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Order order = JSON.parseObject(new String(msg.getBody()), Order.class);
if (orderTransLogMapper.count(order.getOrderNo()) > 0) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
/**
* 事务消息在回查指定次数后,会自动回滚该消息
*/
return LocalTransactionState.UNKNOW;
}
}
}
TransactionListener 实现的要点如下。 1)executeLocalTransaction:该方法实现具体的业务逻辑,包含记录本地事务状态。主要是设置本地事务状态,该方法与业务方代码在一个事务中,例如在OrderServer#createMap中,只要本地事务提交成功,该方法也会提交成功。故在这里,主要是向t_message_transaction添加一条记录,在事务回查时,如果存在记录,就认为是该消息需要提交,其返回值建议返回LocalTransactionState.UNKNOW。
2)checkLocalTransaction:该方法主要告知RocketMQ消息是否需要提交或者回滚,如果本地事务表(t_message_transaction)存在记录,则认为提交;如果不存在,返回事务状态未知。如果在指定次数内还是未查到消息,RocketMQ将自动回滚该消息,默认为15次,可自定义。
基于实际场景的事务消息实战完整代码已经上传到笔者维护的GitHub仓库,欢迎获取( https://github.com/dingwpmz/rocketmq-learning )。