若您对RocketMQ技术感兴趣,请加入 RocketMQ技术交流群html
咱们以一个订单流转流程来举例,例如订单子系统建立订单,须要将订单数据下发到其余子系统(与第三方系统对接)这个场景,咱们一般会将两个系统进行解耦,不直接使用服务调用的方式进行交互。其业务实现步骤一般为:java
一、方案一
伪代码以下:
方案弊端:web
方案二:
伪代码以下:
而后在控制器层,使用异步发送,将消息发送,并在消息发送成功后,更新待发送状态为已发送。
而后经过定时任务,扫描待发送,结合建立时间的记录(小于当前时间5分钟的消息待发送记录),进行消息发送。
方案弊端:
一、消息有可能重复发送,但在消费端能够经过惟一业务编号来进行去重设计。
二、实现过于复杂,为了不极端状况下的消息丢失,须要使用定时任务。数据库
方案三:基于RocketMQ4.3版本事务消息
额外须要实现事务会查监听器:TransactionListener,其实例代码:apache
package org.apache.rocketmq.example.transaction; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import java.util.concurrent.ConcurrentHashMap; @SuppressWarnings("unused") public class OrderTransactionListenerImpl implements TransactionListener { private ConcurrentHashMap<String, Integer> countHashMap = new ConcurrentHashMap<>(); private final static int MAX_COUNT = 5; @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { // String bizUniNo = msg.getUserProperty("bizUniNo"); // 从消息中获取业务惟一ID。 // 将bizUniNo入库,表名:t_message_transaction,表结构 bizUniNo(主键),业务类型。 return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status = 0; // 从数据库查查询t_message_transaction表,若是该表中存在记录,则提交, String bizUniNo = msg.getUserProperty("bizUniNo"); // 从消息中获取业务惟一ID。 // 而后t_message_transaction 表,是否存在bizUniNo,若是存在,则返回COMMIT_MESSAGE, // 不存在,则记录查询次数,未超过次数,返回UNKNOW,超过次数,返回ROLLBACK_MESSAGE if(query(bizUniNo) > 0 ) { return LocalTransactionState.ROLLBACK_MESSAGE; } return rollBackOrUnown(bizUniNo); } public int query(String bizUniNo) { return 1; //select count(1) from t_message_transaction a where a.biz_uni_no=#{bizUniNo} } public LocalTransactionState rollBackOrUnown(String bizUniNo) { Integer num = countHashMap.get(bizUniNo); if(num != null && ++num > MAX_COUNT) { countHashMap.remove(bizUniNo); return LocalTransactionState.ROLLBACK_MESSAGE; } if(num == null) { num = new Integer(1); } countHashMap.put(bizUniNo, num); return LocalTransactionState.UNKNOW; } }
TransactionListener 实现要点:markdown
在这里主要是t_message_transaction添加一条记录,在事务会查时,若是存在记录,就认为是该消息须要提交。异步
本节的分享就到此结束了,本文主要是考虑在使用消息中间件时,若是保证不丢消息的一些实践思考。ide