RocketMQ-事务消息

一、事务消息实现方式

       应用使用事务消息的步骤:

      (1)应用发送消息,使用prepare字段标示准备消息

      (2)应用执行本地业务逻辑

      (3)应用发送事务提交或回滚消息

         broker收到prepare消息后会将topic替换为RMQ_SYS_TRANS_HALF_TOPIC,queueId替换为0,然后写入commitlog,原有topic与queueid信息写入REAL_TOPIC与REAL_QID字段中,因为没有投递到真实的业务主题的队列上,下游系统并不会感知到消息的存在。

       当broker收到commit消息后,会查出原有消息体,将REAL_TOPIC与REAL_QID信息写入topic和queueId,再回写到commitlog中,此时下游系统便可以进行消费。broker会向commitlog写入一条表示prepare消息已经完成的消息,topic是RMQ_SYS_TRANS_OP_HALF_TOPIC,queueId与RMQ_SYS_TRANS_HALF_TOPIC中消息的queueId一致。

二、事务回查机制

     消息写入commitlog后,不能删除也不可更改,broker为确定prepare消息的状态,需要一种回查机制,不断检查prepare消息队列来确定,主要通过事务回查线程来完成(TransactionalMessageCheckService)。

     RMQ_SYS_TRANS_HALF_TOPIC队列保存准备消息,处理进度用prepareOffset表示,RMQ_SYS_TRANS_OP_HALF_TOPIC保存已经完成的准备消息,处理进度用opOffset表示,两个消息队列如下如所示。             

     在RMQ_SYS_TRANS_OP_HALF_TOPIC中从opOffset开始先查出32个消息放在opMsgList中,消息内容是RMQ_SYS_TRANS_HALF_TOPIC中 对应msg的offset。

    (1)回查线程从prepareOffset开始遍历prepre消息列表,如果消息偏移量在opMsgList中,则表示该消息已经完成,prepareOffset向下移动。

   (2)如果消息偏移量不在opMsgList中,并且消息被回查次数未超过最大次数,则会构建新的消息重新写入commitlog,留作以后回查。如果超过回查次数,则直接pass。注意:重新写入commitlog的策略有很多,不止一种。

  (3)顺序更新opOffset

 

总结:此中回查机制实时性比较差,如果某条消息一致未发送完成消息,则回查机制需要不断回查,直到满足回查次数或超时。