先写库还是先发消息
首先,咱们来看一下工作场景,订单ID1030被创建后要保存到数据库,同时该1030订单通过MQ投递给其他系统进行消费。如果要保证订单数据入库与消息投递状态要保证最终一致,要怎么做?这里有两种常见做法:
● 第一种,先写库,再发送数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
orderDao.insert(1030,order);
orderDetailDao.insert(10081,1030,orderDetail1); orderDetailDao.insert(10082,1030,orderDetail2); orderDetailDao.insert(10083,1030,orderDetail3);
SendResult result = producer.send(orderMessage) if (result.getState().equals("SEND_OK"))) { connection.commit(); } else { connection.rollback(); }
|
- 如果生产者发送消息时,因为网络原因导致10秒消息才返回SendResult结果,这就意味这10秒内数据库事务无法提交,大量并发下,数据库连接资源会在这10秒内迅速耗尽,后续请求进入连接池等待状态,最终导致系统停止响应。
- 事务还没有提交,消息就已经被消费者消费了,消费者做数据检查时会报错,导致需要等到下次重试才能被消费者消费成功,大量出现这种消息,可能造成消息积压。
● 第二种,先发消息,再写库
1 2 3 4 5 6 7 8 9 10 11 12
|
SendResult result = producer.send(orderMessage) if(result.getState().equals("SEND_OK"))){ orderDao.insert(1030,order); orderDetailDao.insert(10081,1030,orderDetail1); orderDetailDao.insert(10082,1030,orderDetail2); orderDetailDao.insert(10083,1030,orderDetail3); connection.commit; }
|
问题更严重,因为消息已经被发送了,消费者可以立即消费,比如下游消费者为1030订单自动设置了“快递信息”,可是如果后续orderDao向数据库插入数据产生异常导致业务失败。我们还需要再次发送“取消1030订单”的消息把下游1030订单分配的“快递信息”给撤销,这些都是在业务层面上的额外处理,这无疑提高了对程序员的要求与处理的难度。
RocketMQ事务消息
● 架构拓扑

● 代码案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| public class MessageType4 { public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException { TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group"); producer.setNamesrvAddr("192.168.31.103:9876"); ExecutorService cachedThreadPool = Executors.newCachedThreadPool(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("check-transaction-thread"); return thread; } }); producer.setExecutorService(cachedThreadPool); TransactionListener transactionListener = new OrderTransactionListenerImpl(); producer.setTransactionListener(transactionListener); producer.start(); Message msg = new Message("order","order-1030", "1030", "1030订单与明细的完整JSON数据(略)".getBytes()); producer.sendMessageInTransaction(msg, null); } }
public class OrderTransactionListenerImpl implements TransactionListener { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { log.info("正在执行本地事务,订单编号:" + msg.getKeys());
log.info("模拟网络中断,Broker并未收到生产者本地事务状态回执,返回UNKNOW"); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String keys = msg.getKeys(); log.info("触发回查,正在检查" + keys + "订单状态");
log.info("回查结果," + keys + "订单已入库,发送Commit指令"); return LocalTransactionState.COMMIT_MESSAGE; } }
|
RocketMQ事务执行过程
● producer.sendMessageInTransaction(msg, null); 执行成功
此时1030订单消息已被发送到MQ服务器(Broker),不过该消息在Broker此时状态为“half-message”,相当于存储在MQ中的“临时消息”,此状态下消息无法被投递给消费者。

● 生产者发送消息成功后自动触发
OrderTransactionListenerImpl.executeLocalTransaction()执行本地事务。
当消息发送成功,紧接着生产者向本地数据库写数据,数据库写入后提交commit,同时executeLocalTransaction方法返回COMMIT_MESSAGE,生产者会再次向MQ服务器发送一个commit提交消息,此前在Broker中保存1030订单消息状态就从“half-message”变为”已提交”,broker将消息发给下游的消费者处理。

1 2 3 4 5 6 7 8 9 10 11 12 13
| public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
|
● 异常流程1:producer.sendMessageInTransaction(msg, null); 执行失败,抛出异常
此时没有任何消息被发出,本地事务也不会执行,除了报错外不会产生任何不一致。

● 异常流程2:producer.sendMessageInTransaction(msg, null); 执行成功,本地事务执行失败
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
|
此时本地事务执行rollback回滚,数据库数据被撤销,同时executeLocalTransaction方法返回ROLLBACK_MESSAGE代表回滚,生产者会再次向MQ服务器发送一个rollback回滚消息,此前在Broker中保存1030订单消息就会被直接删除,不会发送给消费者,本地事务也可以保证与MQ消息一致。

● 异常流程3:producer.sendMessageInTransaction(msg, null); 执行成功,本地事务执行成功,但给Broker返回Commit消息时断网了,导致broker无法收到提交指令。
1 2 3 4 5 6 7 8 9 10 11 12
| public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
|
此时本地数据库订单数据已入库,但MQ因为断网无法收到生产者的发来的“commit”消息,1030订单数据一直处于“half message”的状态,消息无法被投递到消费者,本地事务与MQ消息的一致性被破坏。

RocketMQ为了解决这个问题,设计了回查机制,对于broker中的half message,每过一小段时间就自动尝试与生产者通信,试图调用通OrderTransactionListenerImpl.checkLocalTransaction()方法确认之前的本地事务是否成功。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public LocalTransactionState checkLocalTransaction(MessageExt msg) { String keys = msg.getKeys(); log.info("触发回查,正在检查" + keys + "订单状态");
log.info("回查结果," + keys + "订单已入库,发送Commit指令"); return LocalTransactionState.COMMIT_MESSAGE; }
|
由MQ服务器主动发起,生产者调用OrderTransactionListenerImpl.checkLocalTransaction()检查之前数据库事务是否完成。

checkLocalTransaction()查询到订单数据,说明之前的数据库事务已经完成,返回COMMIT_MESSAGE,这样Broker中的1030订单消息就可以被发送给消费者进行处理。

1 2 3 4 5
| 运行结果: 22:31:35.670 [main] INFO com.lixiang.rocketmq.mtype.OrderTransactionListenerImpl - 正在执行本地事务,订单编号:1030 22:31:35.672 [main] INFO com.lixiang.rocketmq.mtype.OrderTransactionListenerImpl - 模拟网络中断,Broker并未收到生产者本地事务状态回执,返回UNKNOW 22:31:45.995 [check-transaction-thread] INFO com.lixiang.rocketmq.mtype.OrderTransactionListenerImpl - 触发回查,正在检查1030订单状态 22:31:45.996 [check-transaction-thread] INFO com.lixiang.rocketmq.mtype.OrderTransactionListenerImpl - 回查结果,1030订单已入库,发送Commit指令
|
checkLocalTransaction()未查询到订单数据,说明之前的数据库事务没有处理成功,返回ROLLBACK_MESSAGE,这样Broker中的1030订单消息就会被删除。

RocketMQ事务消息执行执行流程
