rocketmq支持发送事务消息,即发送消息的事务性,这里便来看一下如何实现?java
实战方面均以
RocketMQTemplate
形式展示,集成方案详见《springboot中rocketmq的集成与使用》git
首先使用@RocketMQTransactionListener
定义一个监听器来模拟执行本地事务和事务会查:github
@Slf4j @RocketMQTransactionListener(txProducerGroup = "tx-group") public class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 模拟本地事务不经过 log.info("============== executeLocalTransaction"); return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // 模拟回查本地事务 log.info("============== checkLocalTransaction"); return RocketMQLocalTransactionState.COMMIT; } }
而后再定义一个消费者,设定topic
为“topic-tx”:web
@RocketMQMessageListener(topic = "topic-tx", consumerGroup = "tx-consumer-group") public class TransactionConsumer implements RocketMQListener<Message> { @Override public void onMessage(Message message) { log.info("topic-tx received message: {}", message); } }
最后定义一个消息生产者,向“topic-tx”主题发送消息,而且须要指定gorup
为“tx-group”(同事务监听器所设置的txProducerGroup
一致):spring
@Slf4j public class TransactionProducer { @Resource private RocketMQTemplate rocketMQTemplate; public void produce() { Message<String> message = new Message<>(); message.setId(UUID.randomUUID().toString()); message.setContent("transaction message"); log.info("========sending message========="); rocketMQTemplate.sendMessageInTransaction("tx-group", "topic-tx", MessageBuilder.withPayload(message).build(), null); log.info("========finish send ========="); } }
跑起项目后,在控制台可看到以下输出:springboot
========sending message========= ============== executeLocalTransaction ========finish send ========= ============== checkLocalTransaction topic-tx received message: Message(id=168486dd-0814-4060-ace1-5a55449b3f72, content=transaction message)