RocketMQ发布了4.3.0版本,New Feature中最受关注的一点就是支持了事务消息。java
RocketMQ做为一款消息中间件,主要做用就是帮助各个系统进行业务解耦,以及对消息流量有削峰填谷的做用,而对于事务消息,主要是经过消息的异步处理,能够保证本地事务和消息发送同时成功执行或失败,从而保证数据的最终一致性,这里咱们先看看一条事务消息从诞生到结束的整个时间线流程:
其中:web
从上述流程能够看到事务消息其实只是保证了生产者发送消息成功与本地执行事务的成功的一致性,消费者在消费事务消息时,broker处理事务消息的消费与普通消息是同样的,若消费不成功,则broker会重复投递该消息16次,若仍然不成功则须要人工介入。数据库
事务消息的成功投递是须要经历三个Topic的,分别是:网络
理解清楚事务消息在这三个Topic的流转就基本理解清楚了RocketMQ的事务消息的处理。接下来咱们看看在源码中是如何使用这三个Topic的。异步
1、在sendMessageInTransaction方法中,主要有:分布式
send方法以同步方式调用sendDefaultImpl方法。ide
2、sendDefaultImpl方法的做用主要用:svg
3、sendKernelImpl方法主要设置了消息的TRANSACTION_PREPARED_TYPE标志以及调用MQClientAPIImpl的sendMessage方法。ui
4、最终会调用到通讯层的RemotingClient类进行消息的发送,并接收broker的响应。this
5、收到响应后返回到sendMessageInTransaction方法中执行后序的逻辑:
6、收到响应后返回到sendMessageInTransaction方法中执行后序的逻辑:
7、在第6步中返回TransactionSendResult以前,会调用this.endTransaction(sendResult, localTransactionState, localException)方法,该方法的做用就是向broker返回本地事务状态。
//判断是不是事务消息 若是是事务消息则用事务消息的逻辑处理 String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (traFlag != null && Boolean.parseBoolean(traFlag)) { if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return response; } putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); } else { putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); }
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) { return store.putMessage(parseHalfMessageInner(messageInner)); }
/** * 将消息进行转换,最终将消息存储到统一处理事务的Topic中:RMQ_SYS_TRANS_HALF_TOPIC * @return 转换后的消息 */ private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { //将消息所属真正Topic存储到消息的properties中 MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); //将消息应该写的queue存储到消息的properties中 MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); //设置事务消息标志:Unknow,由于如今尚未接收到该事务消息的状态 msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); //设置消息存储到的Topic:统一事务消息Topic:RMQ_SYS_TRANS_HALF_TOPIC msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); //全部事务消息存放在该Topic的第一个队列里 msgInner.setQueueId(0); //将其他该消息的属性统一存放进来 msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; }
生产者在发送prepare消息后—>执行本地事务逻辑—>broker接收请求结束本次事务状态:Broker在接收请求后根据命令会执行EndTransactionProcessor的processRequest方法,该方法中下面的逻辑是真正处理事务消息状态的:
OperationResult result = new OperationResult(); if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // 获取Half Topic中的prepare消息 result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { // 校验消息是否正确:Half中的该消息是否是真正的本次请求处理的消息 RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { // 将prepare消息转换为原消息,该消息的Topic就是真正消息的Topic MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback())); msgInner.setQueueOffset(requestHeader.getTranStateTableOffset()); msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); //将消息发送到真正的Topic里,该消息能够开始下发给消费者 RemotingCommand sendResult = sendFinalMessage(msgInner); if (sendResult.getCode() == ResponseCode.SUCCESS) { //将消息放入Op Half Topic this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return sendResult; } return res; } } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { //同commitMessage方法同样,返回真正的操做的消息:将Half Topic中的该消息还原为原消息 result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { //将消息放入Op Half Topic this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return res; } }
该方法会判断本次事务的最终状态,若是是Commit:
若是落盘成功,则删除prepare消息,实际上是将消息写入到Op Topic里,该消息的内容就是这条消息在Half Topic队列里的offset,缘由见后面的分析
若是是Rollback,则直接将消息转换为原消息,并写入到Op Topic里。
在RocketMQ中,消息都是顺序写随机读的,以offset来记录消息的存储位置与消费位置,因此对于事务消息的prepare消息来讲,不可能作到物理删除,broker启动时每间隔60s会开始检查一下有哪些prepare消息须要回查,从上面的分析咱们知道,全部prepare消息都存储在Half Topic中,那么如何从该Topic中取出须要回查的消息进行回查呢?这就须要Op Half Topic以及一个内部的消费进度计算出须要回查的prepare消息进行回查:
broker在启动时会启动线程回查的服务,在TransactionMessageCheckService的run方法中,该方法会执行到onWaitEnd方法:
@Override protected void onWaitEnd() { //获取超时时间 6s long timeout = brokerController.getBrokerConfig().getTransactionTimeOut(); //获取最大检测次数 15次 int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax(); //获取当前时间 long begin = System.currentTimeMillis(); log.info("Begin to check prepare message, begin time:{}", begin); //开始检测 this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener()); log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin); }
该方法的最后会执行到TransactionMessageServiceImpl的check方法,该方法就是真正执行事务回查检测的方法,该方法的主要做用就是计算出须要回查的prepare消息进行事务回查,大体逻辑是:
生产客户端的ClientRemotingProcessor的processRequest方法会处理服务端的CHECK_TRANSACTION_STATE请求,最后会调用checkLocalTransactionState方法,该方法就是业务方能够本身实现事务消息回查逻辑的地方,并将结果最后用endTransactionOneway方法返回给Broker,该执行逻辑能够经过ClientRemotingProcessor的方法processRequest依次理解就能够了。
经过上面的文章,能够大体了解事务消息的实现,咱们能够知道,事务消息主要有三个状态:
那么监控事务消息时,主要是查看该事务消息是不是处于咱们想要的状态,而在事务消息生产者发送prepare消息成功后只能拿到一个transactionId,该id不是的RocketMQ消息存储的物理offset地址,RocketMQ只有在准备写入commitlog文件时才会生成真正的msgId,而这里能够获取的transactionId和msgId都是客户端生成的一个消息的惟一标识符,咱们在这里称为uniqId,在broker端,会把该uniqId做为一个msgKey写入消息,因此能够经过该uniqId来查找uniqId的一些状态:
经过上述三步就能够肯定事务消息的状态。
事务消息的异常状态主要有:
对于1:事务消息会根据producerGroup搜寻其余的生产者实例进行回查,因此transactionId务必保存在中央存储中,而且事务消息的pid不能跟其余消息的pid混用。
对于2:当前实例会搜寻其余的可用的broker-master进行提交,由于只有提交prepare消息后才会执行本地事务,因此没有影响,注意生产者报的是超时异常时,是不会进行重发的。
对于3:由于返回状态是oneway方式,此时若是消费者未收到消息,须要用手段肯定该事务消息的状态,尽快将broker重启,broker重启后会经过回查完成事务消息。
对于4:同3,尽快重启broker。
生产者发送消息
public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException { TransactionMQProducer producer = new TransactionMQProducer("text_tx_producer_group_name"); //建立线程池 ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), (r) -> { Thread thread = new Thread(r); thread.setName("text_tx_producer_group_name" + "-check-thread"); return thread; }); producer.setNamesrvAddr(Const.NAMESRV_ADDR); producer.setExecutorService(executorService); //这个对象主要作两件事,第一件事情 就是 执行本地事物 第二件事情就是作回查 TransactionListener transactionListener = new TransactionListenerImpl(); producer.setTransactionListener(transactionListener); producer.start(); Message message = new Message("test_tx_topic", " TagA", "key", ("hello rocketmq 4 tx!").getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.sendMessageInTransaction(message, "我是回调的参数"); Thread.sleep(Integer.MAX_VALUE); } }
事件监听类
public class TransactionListenerImpl implements TransactionListener { @Override public LocalTransactionState executeLocalTransaction(Message message, Object arg) { String callArg = (String)arg; System.out.println("callArg:" + callArg); //开始事物 // 数据库的落库操做 //事物结束 return LocalTransactionState.COMMIT_MESSAGE; } @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { System.out.println("回调事物检查" + messageExt); return LocalTransactionState.COMMIT_MESSAGE; } }
消费者
public class TransactionConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_tx_consumer_group_name"); consumer.setConsumeThreadMax(20); consumer.setConsumeThreadMin(10); consumer.setNamesrvAddr(Const.NAMESRV_ADDR); //从尾部消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe("test_quick_topic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt messageExt = list.get(0); try { System.out.println("收到事物消息" + new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET)); } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }