RocketMQ事务消息阅读目录指引:
RocketMQ源码分析之从官方示例窥探RocketMQ事务消息实现基本思想
RocketMQ源码分析之RocketMQ事务消息实现原理上篇
RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ事务消息实战web
根据上节Demo示例,发送事务消息的入口为:TransactionMQProducer#sendMessageInTransaction:apache
public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException { if (null == this.transactionListener) { // @1 throw new MQClientException("TransactionListener is null", null); } return this.defaultMQProducerImpl.sendMessageInTransaction(msg, transactionListener, arg); // @2 }
代码@1:若是transactionListener为空,则直接抛出异常。
代码@2:调用defaultMQProducerImpl的sendMessageInTransaction方法。
DefaultMQProducerImpl#sendMessageInTransaction服务器
public TransactionSendResult sendMessageInTransaction(final Message msg, final TransactionListener tranExecuter, final Object arg) throws MQClientException {
Step1:首先先阐述一下参数含义。final Message msg:消息;TransactionListener tranExecuter:事务监听器; Object arg:其余附加参数,该参数会再TransactionListener 回调函数中原值传入。
DefaultMQProducerImpl#sendMessageInTransaction微信
SendResult sendResult = null; MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); }
Step2:在消息属性中,添加两个属性:TRAN_MSG,其值为true,表示为事务消息;PGROUP:消息所属发送者组,而后以同步方式发送消息。
在消息发送以前,会先检查消息的属性TRAN_MSG,若是存在而且值为true,则经过设置消息系统标记的方式,设置消息为MessageSysFlag.TRANSACTION_PREPARED_TYPE。
DefaultMQProducerImpl#sendKernelImplsvg
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } SendMessageProcessor#sendMessage String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (traFlag != null && Boolean.parseBoolean(traFlag)) { if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return response; } putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); } else { putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); }
Step3:Broker端首先客户发送消息请求后,判断消息类型,若是是事务消息,则调用TransactionalMessageService#prepareMessage方法,不然走原先的逻辑,调用MessageStore#putMessage方法。函数
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#prepareMessage public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) { return transactionalMessageBridge.putHalfMessage(messageInner); }
step4:事务消息,将调用TransactionalMessageServiceImpl#prepareMessage方法,继而调用TransactionalMessageBridge#prepareMessage方法。源码分析
TransactionalMessageBridge#parseHalfMessageInner public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) { return store.putMessage(parseHalfMessageInner(messageInner)); } private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); msgInner.setQueueId(0); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; }
Step5:备份消息的原主题名称与原队列ID,而后取消是事务消息的消息标签,从新设置消息的主题为:RMQ_SYS_TRANS_HALF_TOPIC,队列ID固定为0。而后调用MessageStore#putMessage方法将消息持久化,这里TransactionalMessageBridge桥接类,就是封装事务消息的相关流程,最终调用MessageStore完成消息的持久化。消息入库后,会继续回到DefaultMQProducerImpl#sendMessageInTransaction,上文的Step2后面,也就是经过同步将消息发送到消息服务端。
DefaultMQProducerImpl#sendMessageInTransactionui
switch (sendResult.getSendStatus()) { case SEND_OK: { try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } localTransactionState = tranExecuter.executeLocalTransaction(msg, arg); if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; }
Step6:若是消息发送成功,会回调TransactionListener#executeLocalTransaction方法,执行本地事务,而且返回本地事务状态为:public enum LocalTransactionState {COMMIT_MESSAGE,ROLLBACK_MESSAGE,
UNKNOW,} 之一,注意:TransactionListener#executeLocalTransaction是在发送者成功发送PREPARED消息后,会执行本地事务方法,而后返回本地事务状态;若是PREPARED消息发送失败,则不会调用
TransactionListener#executeLocalTransaction,而且本地事务消息,设置为
LocalTransactionState.ROLLBACK_MESSAGE,表示消息须要被回滚。
DefaultMQProducerImpl#sendMessageInTransactionthis
try { this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); }
step7:调用endTransaction方法结束事务(提交或回滚)。
DefaultMQProducerImpl#endTransaction.net
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); switch (localTransactionState) { case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId());
step8:组装结束事务请求,主要参数为:事务ID、事务操做(commitOrRollback)、消费组、消息队列偏移量、消息ID,fromTransactionCheck,从这里发出的请求,默认为false。Broker端的请求处理器为:EndTransactionProcessor。
step9:EndTransactionProcessor根据事务提交类型:TRANSACTION_COMMIT_TYPE(提交事务)、TRANSACTION_ROLLBACK_TYPE(回滚事务)、TRANSACTION_NOT_TYPE、忽略该请求,会记录info级别的日志相关的代码将在下文详细分析,在这里,咱们先大概梳理一条消息发送的路径TransactionMQProducer#sendMessageInTransaction的调用链来总结一下事务消息的发送流程。
本文到这里,初步展现了事务消息的发送流程,总的说来,RocketMQ的事务消息发送使用二阶段提交思路,首先,在消息发送时,先发送消息类型为Prepread类型的消息,而后在将该消息成功存入到消息服务器后,会回调 TransactionListener#executeLocalTransaction,执行本地事务状态回调函数,而后根据该方法的返回值,结束事务:
一、COMMIT_MESSAGE :提交事务。
二、ROLLBACK_MESSAGE:回滚事务。
三、UNKNOW:未知事务状态,此时消息服务器(Broker)收到EndTransaction命令时,将不对这种消息作处理,消息还处于Prepared类型,存储在主题为:RMQ_SYS_TRANS_HALF_TOPIC的队列中,而后消息发送流程将结束,那这些消息如何提交或回滚呢?为了实现避免客户端须要再次发送提交、回滚命令,RocketMQ会采起定时任务将RMQ_SYS_TRANS_HALF_TOPIC中的消息取出,而后回到客户端,判断该消息是否须要提交或回滚,来完成事务消息的声明周期,该部份内容将在下节重点探讨。
事务消息后续文章预告:
一、事务消息状态会查机制实现。
二、消息服务端在收到客户端的回滚、提交命令时,若是高效处理事务消息的提交、回滚动做。
三、事务消息实战。
欢迎加笔者微信号(dingwpmz),加群探讨,笔者优质专栏目录:
一、源码分析RocketMQ专栏(40篇+)
二、源码分析Sentinel专栏(12篇+)
三、源码分析Dubbo专栏(28篇+)
四、源码分析Mybatis专栏
五、源码分析Netty专栏(18篇+)
六、源码分析JUC专栏
七、源码分析Elasticjob专栏
八、Elasticsearch专栏(20篇+)
九、源码分析MyCat专栏