RocketMQ学习(九)——RocketMQ事务消息

RocketMQ发布了4.3.0版本,New Feature中最受关注的一点就是支持了事务消息。java

基础概念

  • 事务消息:MQ 提供相似 X/Open XA 的分布事务功能,经过 MQ 事务消息能达到分布式事务的最终一致。
  • 半消息:暂不能投递的消息,发送方已经将消息成功发送到了 MQ 服务端,可是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息。
    消息回查:因为网络闪断、生产者应用重启等缘由,致使某条事务消息的二次确认丢失,MQ 服务端经过扫描发现某条消息长期处于“半消息”时,须要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。

实现原理

RocketMQ做为一款消息中间件,主要做用就是帮助各个系统进行业务解耦,以及对消息流量有削峰填谷的做用,而对于事务消息,主要是经过消息的异步处理,能够保证本地事务和消息发送同时成功执行或失败,从而保证数据的最终一致性,这里咱们先看看一条事务消息从诞生到结束的整个时间线流程:
在这里插入图片描述
其中:web

  1. 生产者发送消息到broker,该消息是prepare消息,且事务消息的发送是同步发送的方式。
  2. broker接收到消息后,会将该消息进行转换,全部的事务消息统一写入Half Topic,该Topic默认是RMQ_SYS_TRANS_HALF_TOPIC ,写入成功后会给生产者返回成功状态。
  3. 本地生产获取到该消息的事务Id,进行本地事务处理。
  4. 本地事务执行成功提交Commit,失败则提交Rollback,超时提交或提交Unknow状态则会触发broker的事务回查。
  5. 若提交了Commit或Rollback状态,Broker则会将该消息写入到Op Topic,该Topic默认是RMQ_SYS_TRANS_OP_HALF_TOPIC,该Topic的做用主要记录已经Commit或Rollback的prepare消息,Broker利用Half Topic和Op Topic计算出须要回查的事务消息。若是是commit消息,broker还会将消息从Half取出来存储到真正的Topic里,从而消费者能够正常进行消费,若是是Rollback则不进行其余操做。
  6. 若是本地事务执行超时或返回了Unknow状态,则broker会进行事务回查。若生产者执行本地事务超过6s则进行第一次事务回查,总共回查15次,后续回查间隔时间是60s,broker在每次回查时会将消息再在Half Topic写一次。回查次数和时间间隔都是可配置的。
  7. 执行事务回查时,生产者能够获取到事务Id,检查该事务在本地执行状况,返回状态同第一次执行本地事务同样。

从上述流程能够看到事务消息其实只是保证了生产者发送消息成功与本地执行事务的成功的一致性,消费者在消费事务消息时,broker处理事务消息的消费与普通消息是同样的,若消费不成功,则broker会重复投递该消息16次,若仍然不成功则须要人工介入。数据库

事务消息的成功投递是须要经历三个Topic的,分别是:网络

  • Half Topic:用于记录全部的prepare消息
  • Op Half Topic:记录已经提交了状态的prepare消息
  • Real Topic:事务消息真正的Topic,在Commit后会才会将消息写入该Topic,从而进行消息的投递

理解清楚事务消息在这三个Topic的流转就基本理解清楚了RocketMQ的事务消息的处理。接下来咱们看看在源码中是如何使用这三个Topic的。异步

生产发送prepare消息

在这里插入图片描述
1、在sendMessageInTransaction方法中,主要有:分布式

  1. 调用Validators.checkMessage(msg, this.defaultMQProducer)校验事务消息的合法性
  2. 对消息设置PROPERTY_TRANSACTION_PREPARED与PROPERTY_PRODUCER_GROUP属性,前者用于判断该消息是prepare消息,后者主要在回查时须要用到。
  3. 调用DefaultMQProducerImpl的send方法进行发送。

send方法以同步方式调用sendDefaultImpl方法。ide

2、sendDefaultImpl方法的做用主要用:svg

  1. 获取对应的Topic的路由
  2. 轮询获取须要发送的队列(在3.2.6版本中这里轮询有个Bug~,轮询次数超过Integer.MAX时会开始报错)
  3. 调用sendKernelImpl进行消息发送

3、sendKernelImpl方法主要设置了消息的TRANSACTION_PREPARED_TYPE标志以及调用MQClientAPIImpl的sendMessage方法。ui

4、最终会调用到通讯层的RemotingClient类进行消息的发送,并接收broker的响应。this

5、收到响应后返回到sendMessageInTransaction方法中执行后序的逻辑:

6、收到响应后返回到sendMessageInTransaction方法中执行后序的逻辑:

  1. 判断响应状态,若是是SNED_OK,就执行 transactionListener.executeLocalTransaction(msg, arg)方法来执行本地事务逻辑
  2. 若是是其余状态,对该消息进行回滚:返回RALLBACK状态
  3. 构造TransactionSendResult对象并返回。

7、在第6步中返回TransactionSendResult以前,会调用this.endTransaction(sendResult, localTransactionState, localException)方法,该方法的做用就是向broker返回本地事务状态。

Broker处理prepare消息

在这里插入图片描述

  1. NettyServerHandler类的processMessageReceived 方法是全部broker请求的入口,该方法会调用NettyRemotingAbstract方法的processMessageReceived方法。
  2. NettyRemotingAbstract的processMessageReceived经过命令模式根据cmd的code获取到对应的processor进行请求的处理,事务prepare消息对应的processor是SendMessageProcessor
  3. SendMessageProcessor的processRequest方法会根据判断是不是批量消息,事务的prepare消息是单条的,调用其sendMessage方法,该方法中有一块单独处理事务消息的逻辑:
//判断是不是事务消息 若是是事务消息则用事务消息的逻辑处理
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
    if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(
            "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                + "] sending transaction message is forbidden");
        return response;
    }
    putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
    putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
  1. 对于prepare消息会调用TransactionMessageBridge的putHalfMessage方法,该方法调用parseHalfMessageInner对prepare消息进行转换并在转换后进行消息的存储:
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
    return store.putMessage(parseHalfMessageInner(messageInner));
}
/** * 将消息进行转换,最终将消息存储到统一处理事务的Topic中:RMQ_SYS_TRANS_HALF_TOPIC * @return 转换后的消息 */
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    //将消息所属真正Topic存储到消息的properties中
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    //将消息应该写的queue存储到消息的properties中
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
        String.valueOf(msgInner.getQueueId()));
    //设置事务消息标志:Unknow,由于如今尚未接收到该事务消息的状态
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    //设置消息存储到的Topic:统一事务消息Topic:RMQ_SYS_TRANS_HALF_TOPIC
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    //全部事务消息存放在该Topic的第一个队列里
    msgInner.setQueueId(0);
    //将其他该消息的属性统一存放进来
  msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}
  1. 能够看到全部的prepare消息都是存储在一个Topic中的一个队列里,该Topic就是上面的Half Topic,最后会对消息进行存储逻辑的操做,并调用handlePutMessageResult构造返回结果返回给生产者。

Broker结束事务消息

在这里插入图片描述
生产者在发送prepare消息后—>执行本地事务逻辑—>broker接收请求结束本次事务状态:Broker在接收请求后根据命令会执行EndTransactionProcessor的processRequest方法,该方法中下面的逻辑是真正处理事务消息状态的:

OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
    // 获取Half Topic中的prepare消息
    result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
    if (result.getResponseCode() == ResponseCode.SUCCESS) {
        // 校验消息是否正确:Half中的该消息是否是真正的本次请求处理的消息
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
        if (res.getCode() == ResponseCode.SUCCESS) {
            // 将prepare消息转换为原消息,该消息的Topic就是真正消息的Topic
            MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
            msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
            msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
            msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
            msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
            //将消息发送到真正的Topic里,该消息能够开始下发给消费者
            RemotingCommand sendResult = sendFinalMessage(msgInner);
            if (sendResult.getCode() == ResponseCode.SUCCESS) {
                //将消息放入Op Half Topic
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
            }
            return sendResult;
        }
        return res;
    }
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
    //同commitMessage方法同样,返回真正的操做的消息:将Half Topic中的该消息还原为原消息
    result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
    if (result.getResponseCode() == ResponseCode.SUCCESS) {
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
        if (res.getCode() == ResponseCode.SUCCESS) {
            //将消息放入Op Half Topic
            this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
        }
        return res;
    }
}

该方法会判断本次事务的最终状态,若是是Commit:

  • 获取Half Topic中的消息
  • 将该消息转换为原消息
  • 将消息写入到真正的Topic里,这里是事务消息的真正落盘,从而消息能够被消费者消费到

若是落盘成功,则删除prepare消息,实际上是将消息写入到Op Topic里,该消息的内容就是这条消息在Half Topic队列里的offset,缘由见后面的分析

若是是Rollback,则直接将消息转换为原消息,并写入到Op Topic里。

事务消息是如何处理回查的

在RocketMQ中,消息都是顺序写随机读的,以offset来记录消息的存储位置与消费位置,因此对于事务消息的prepare消息来讲,不可能作到物理删除,broker启动时每间隔60s会开始检查一下有哪些prepare消息须要回查,从上面的分析咱们知道,全部prepare消息都存储在Half Topic中,那么如何从该Topic中取出须要回查的消息进行回查呢?这就须要Op Half Topic以及一个内部的消费进度计算出须要回查的prepare消息进行回查:

  • Half Topic 默认Topic是RMQ_SYS_TRANS_HALF_TOPIC,建一个队列,存储全部的prepare消息
  • Op Half Topic默认是RMQ_SYS_TRANS_OP_HALF_TOPIC,创建的对列数与Half Topic相同,存储全部已经肯定状态的prepare消息(rollback与commit状态),消息内容是该条消息在Half Topic的Offset
  • Half Topic消费进度,默认消费者是CID_RMQ_SYS_TRANS,每次取prepare消息判断回查时,从该消费进度开始依次获取消息。
  • Op Half Topic消费进度,默认消费者是CID_RMQ_SYS_TRANS,每次获取prepare消息都须要判断是否在Op Topic中已存在该消息了,若存在表示该prepare消息已结束流程,不须要再进行事务回查,每次判断都是从Op Topic中获取必定消息数量出来进行对比的,获取的消息就是从Op Topic中该消费进度开始获取的,最大一次获取32条。

在这里插入图片描述
broker在启动时会启动线程回查的服务,在TransactionMessageCheckService的run方法中,该方法会执行到onWaitEnd方法:

@Override
protected void onWaitEnd() {
    //获取超时时间 6s
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
    //获取最大检测次数 15次
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
    //获取当前时间
    long begin = System.currentTimeMillis();
    log.info("Begin to check prepare message, begin time:{}", begin);
    //开始检测
    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
    log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

该方法的最后会执行到TransactionMessageServiceImpl的check方法,该方法就是真正执行事务回查检测的方法,该方法的主要做用就是计算出须要回查的prepare消息进行事务回查,大体逻辑是:

  1. 获取Half Topic的全部队列,循环队列开始检测须要获取的prepare消息,实际上Half Topic只有一个队列。
  2. 获取Half Topic与Op Half Topic的消费进度。
  3. 调用fillOpRemoveMap方法,获取Op Half Topic中已完成的prepare事务消息。
  4. 从Half Topic中当前消费进度依次获取消息,与第3步获取的已结束的prepare消息进行对比,判断是否进行回查:
  5. 若是Op消息中包含该消息,则不进行回查,
  6. 若是不包含,获取Half Topic中的该消息,判断写入时间是否符合回查条件,如果新消息则不处理下次处理,并将消息从新写入Half Topic,判断回查次数是否小于15次,写入时间是否小于72h,若是不知足就丢弃消息,若知足则更新回查次数,并将消息从新写入Half Topic并进行事务回查,
  7. 在循环完后从新更新Half Topic与Op Half Topic中的消费进度,下次判断回查逻辑时,将从最新的消费进度获取信息。

生产客户端的ClientRemotingProcessor的processRequest方法会处理服务端的CHECK_TRANSACTION_STATE请求,最后会调用checkLocalTransactionState方法,该方法就是业务方能够本身实现事务消息回查逻辑的地方,并将结果最后用endTransactionOneway方法返回给Broker,该执行逻辑能够经过ClientRemotingProcessor的方法processRequest依次理解就能够了。

咱们有哪些手段来监控事务消息的状态

经过上面的文章,能够大体了解事务消息的实现,咱们能够知道,事务消息主要有三个状态:

  • UNKNOW状态:表示事务消息未肯定,多是业务方执行本地事务逻辑时间耗时过长或者网络缘由等引发的,该状态会致使broker对事务消息进行回查,默认回查总次数是15次,第一次回查间隔时间是6s,后续每次间隔60s,
  • ROLLBACK状态,该状态表示该事务消息被回滚,由于本地事务逻辑执行失败致使
  • COMMIT状态,表示事务消息被提交,会被正确分发给消费者。

那么监控事务消息时,主要是查看该事务消息是不是处于咱们想要的状态,而在事务消息生产者发送prepare消息成功后只能拿到一个transactionId,该id不是的RocketMQ消息存储的物理offset地址,RocketMQ只有在准备写入commitlog文件时才会生成真正的msgId,而这里能够获取的transactionId和msgId都是客户端生成的一个消息的惟一标识符,咱们在这里称为uniqId,在broker端,会把该uniqId做为一个msgKey写入消息,因此能够经过该uniqId来查找uniqId的一些状态:

  1. 经过DefaultMQAdminExt的viewMessage(String topic, String msgId)方法能够消息的信息,这里topic参数是RMQ_SYS_TRANS_HALF_TOPIC ,该topic是真正的Half Topic,msgId传发送prepare消息获取的uniqId,这样能够获取prepare消息在Half Topic真正的offsetMsgId,
  2. 经过第一步获取的offsetMsgId继续调用viewMessage(String topic, String msgId)方法,可是topic是RMQ_SYS_TRANS_OP_HALF_TOPIC,这样能够获取Op Half Topic中该事务消息的状态,若是存在说明prepare消息已处理,不然可能仍在回查中或已被丢弃
  3. 若是在第二步查到了信息能够用uniqId和事务消息真正Topic继续调用viewMessage(String topic, String msgId)方法获取消息真正的信息,若是存在说明消息已被投递,不然该事务消息已被回滚。只经过Op Half Topic是不能肯定消息状态的,这里的sysFlag被设置0,sysFlag是用于肯定事务消息状态。

经过上述三步就能够肯定事务消息的状态。

事务消息的异常恢复机制

事务消息的异常状态主要有:

  1. 生产者提交prepare消息到broker成功,可是当前生产者实例宕机了
  2. 生产者提交prepare消息到broker失败,多是由于提交的broker已宕机
  3. 生产者提交prepare消息到broker成功,执行本地事务逻辑成功,可是broker宕机了未肯定事务状态
  4. 生产提交prepare消息到broker成功,可是在进行事务回查的过程当中broker宕机了,未肯定事务状态

对于1:事务消息会根据producerGroup搜寻其余的生产者实例进行回查,因此transactionId务必保存在中央存储中,而且事务消息的pid不能跟其余消息的pid混用。

对于2:当前实例会搜寻其余的可用的broker-master进行提交,由于只有提交prepare消息后才会执行本地事务,因此没有影响,注意生产者报的是超时异常时,是不会进行重发的。

对于3:由于返回状态是oneway方式,此时若是消费者未收到消息,须要用手段肯定该事务消息的状态,尽快将broker重启,broker重启后会经过回查完成事务消息。

对于4:同3,尽快重启broker。

事物消息Demo

生产者发送消息

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
        TransactionMQProducer producer = new TransactionMQProducer("text_tx_producer_group_name");
        //建立线程池
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(2000),
                (r) -> {
                    Thread thread = new Thread(r);
                    thread.setName("text_tx_producer_group_name" + "-check-thread");
                    return thread;
                });
        producer.setNamesrvAddr(Const.NAMESRV_ADDR);
        producer.setExecutorService(executorService);
        //这个对象主要作两件事,第一件事情 就是 执行本地事物 第二件事情就是作回查
        TransactionListener transactionListener = new TransactionListenerImpl();
        producer.setTransactionListener(transactionListener);
        producer.start();

        Message message = new Message("test_tx_topic", " TagA", "key", ("hello rocketmq 4 tx!").getBytes(RemotingHelper.DEFAULT_CHARSET));
        producer.sendMessageInTransaction(message, "我是回调的参数");
        Thread.sleep(Integer.MAX_VALUE);
    }
}

事件监听类

public class TransactionListenerImpl implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
        String callArg = (String)arg;
        System.out.println("callArg:" + callArg);
        //开始事物
        // 数据库的落库操做
        //事物结束
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        System.out.println("回调事物检查" + messageExt);
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

消费者

public class TransactionConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_tx_consumer_group_name");

        consumer.setConsumeThreadMax(20);
        consumer.setConsumeThreadMin(10);
        consumer.setNamesrvAddr(Const.NAMESRV_ADDR);

        //从尾部消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        consumer.subscribe("test_quick_topic", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                MessageExt messageExt = list.get(0);
                try {
                    System.out.println("收到事物消息" + new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
    }
}