事务消息

我们以电商下单场景为例模拟事务消息的使用方法。订单子系统创建订单,需要将订单数据下发到其他子系统(与第三方系统对接)。我们通常会将两个系统进行解耦,不会直接使用服务调用的方式进行交互,业务实现步骤如下。

1)系统创建订单并入库。 2)发送消息到MQ。 3)消费者消费消息,发送远程RPC服务调用,完成订单数据的同步。

方案一

方案一如代码清单11-11所示。

@SuppressWarnings("rawtypes")
public Map createOrder() {
    Map result = new HashMap();
    // 执行下单相关的业务流程,例如操作本地数据库落库
    // 调用消息发送端API发送消息
    // 返回结果,提交事务

    return result;
}

方案一的弊端如下。

1)如果消息发送成功,但在提交事务的时候JVM突然挂掉,导致事务提交失败,那么两个系统之间的数据会不一致。

2)因为消息是在事务提交之前提交的,所以发送的消息内容是订单实体的内容,在消费端进行消费时,可能会出现订单不存在的情况。

方案二

由于存在上述问题,在 RocketMQ 不支持事务消息的前提条件下,可以采用代码清单11-12所示的方式进行优化。

代码清单11-12 下单伪代码
@SuppressWarnings("rawtypes")
public Map createOrder() {
    Map result = new HashMap();
    // 执行下单相关的业务流程,例如操作本地数据库落库
    // 生成事务消息唯一业务标识,将该业务标识组装到待发送的消息体中
    // 向待发送消息表中插入一条记录,内容包括本次唯一消息发送业务ID、消息JSON{消息主题、消息tag、消息体}、创建时间、发送状态
    // 将消息体返回到控制器层
    // 返回结果,提交事务
    return result;
}

在控制器层异步发送消息的同时需要引入定时机制,用于扫描消息发送记录,避免消息丢失。

方案二的弊端如下。

1)消息有可能重复发送,但在消费端可以通过唯一业务编号进行去重设计。 2)实现过于复杂,为了避免极端情况下丢失消息,需要使用定时任务。

方案三

方案三基于RocketMQ 4.3版的事务消息实现,如代码清单11-13所示。

代码清单11-13 订单下单伪代码示例
import com.alibaba.fastjson.JSON;
import com.example.demo.entry.Order;
import com.example.demo.help.SpringContextUtils;
import com.example.demo.service.OrderService;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent .*;

@Servicepublic
class OrderServiceImpl implements OrderService {
    private TransactionMQProducer mqProducer;

    public OrderServiceImpl() {
        mqProducer = new TransactionMQProducer("order_producer_grpup");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFacto ry() {
            @Override
            public Thread newThread(Runnable r) {

                Thread thread = new Thread(r);
                thread.setName("order - producer - grpup_msg - check - thread");
                return thread;
            }
        });
        mqProducer.setExecutorService(executorService);        // 设置事务消息回调监听器
        mqProducer.setTransactionListener(SpringContextUtils.getBean("orderMessageListener"));
        try {
            mqProducer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    @Override
    public Map saveOrCreateOrder(Order order) {
        Map result = new HashMap();
        if (order.getBuyerId() == null || order.getBuyerId() < 1) {
            result.put("code", "1");
            result.put("msg", "用户购买者不能为空");
            return result;
        }
        // 省略其他业务类校验
        try {
            mqProducer.send(new Message("topic_order", JSON.toJSONString(order).getBytes()));
        } catch (Throwable e) {
            e.printStackTrace();
            //可以进行一些重试,在这里直接返回错误
            result.put("code", "1");
            result.put("msg", "系统异常");
            return result;
        }
        result.put(" code", 0);
        return result;
    }
}

代码清单11-13主要是完成业务类规则的校验,然后发送一条消息到RocketMQ,业务的具体逻辑在事务消息回调函数中实现,具体在TransactionListener监听器中实现,如代码清单11-14所示。

代码清单11-14 TransactionListener监听器实现示例
import com.alibaba.fastjson.JSON;
import com.example.demo.entry.Order;
import com.example.demo.entry.OrderTransLog;
import com.example.demo.mapper.OrderMapper;
import com.example.demo.mapper.OrderTransLogMapper;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;

@Servicepublic
class OrderMessageListener implements TransactionListener {
    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private OrderTransLogMapper orderTransLogMapper;

    /**
     * 温馨提示:该方法需要被包含在事务中
     *
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        Order order = JSON.parseObject(new String(msg.getBody()), Order.class);
        // 进行一系列业务处理
        orderMapper.insertOrder(order);
        OrderTransLog log = new OrderTransLog();
        log.setUnionCode(order.getOrderNo());
        log.setCreateDate(new Date(System.currentTimeMillis()));
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Order order = JSON.parseObject(new String(msg.getBody()), Order.class);
        if (orderTransLogMapper.count(order.getOrderNo()) > 0) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else {
            /**
             * 事务消息在回查指定次数后,会自动回滚该消息
             */
            return LocalTransactionState.UNKNOW;
        }
    }
}

TransactionListener 实现的要点如下。 1)executeLocalTransaction:该方法实现具体的业务逻辑,包含记录本地事务状态。主要是设置本地事务状态,该方法与业务方代码在一个事务中,例如在OrderServer#createMap中,只要本地事务提交成功,该方法也会提交成功。故在这里,主要是向t_message_transaction添加一条记录,在事务回查时,如果存在记录,就认为是该消息需要提交,其返回值建议返回LocalTransactionState.UNKNOW。

2)checkLocalTransaction:该方法主要告知RocketMQ消息是否需要提交或者回滚,如果本地事务表(t_message_transaction)存在记录,则认为提交;如果不存在,返回事务状态未知。如果在指定次数内还是未查到消息,RocketMQ将自动回滚该消息,默认为15次,可自定义。

基于实际场景的事务消息实战完整代码已经上传到笔者维护的GitHub仓库,欢迎获取( https://github.com/dingwpmz/rocketmq-learning )。