RocketMQ将事务拆分红小事务异步执行的方式来执行。
RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段经过第一阶段拿到的地址去访问消息,并修改状态。RocketMQ会按期扫描消息集群中的事物消息,这时候发现了Prepared消息,它会向消息发送者确认,RocketMQ会根据发送端设置的策略来决定是回滚仍是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
RocketMQ事务消息:
TransactionCheckListenerImpl:java
package aaron.mq.producer; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.common.message.MessageExt; /** * Created by Aaron Sheng on 10/19/16. * TransactionCheckListenerImpl handle transaction unsettled. * Broker will notify producer to check local transaction. */ public class TransactionCheckListenerImpl implements TransactionCheckListener { @Override public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) { System.out.println("checkLocalTransactionState"); System.out.println("topic: " + messageExt.getTopic()); System.out.println("body: " + messageExt.getBody()); return LocalTransactionState.ROLLBACK_MESSAGE; } }
TransactionExecuterImpl:异步
package aaron.mq.producer; import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.common.message.Message; import java.util.concurrent.atomic.AtomicInteger; /** * Created by Aaron Sheng on 10/19/16. * TransactionExecuterImpl executre local trancation and return result to broker. */ public class TransactionExecuterImpl implements LocalTransactionExecuter { private AtomicInteger transactionIndex = new AtomicInteger(0); @Override public LocalTransactionState executeLocalTransactionBranch(Message message, Object o) { System.out.println("executeLocalTransactionBranch " + message.toString()); int value = transactionIndex.getAndIncrement(); if ((value % 3) == 0) { return LocalTransactionState.COMMIT_MESSAGE; } else if ((value % 3) == 1) { return LocalTransactionState.ROLLBACK_MESSAGE; } else{ return LocalTransactionState.UNKNOW; } } }
TransactionProducer:ide
package aaron.mq.producer; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.client.producer.TransactionMQProducer; import com.alibaba.rocketmq.common.message.Message; /** * Created by Aaron Sheng on 10/19/16. */ public class TransactionProducer { public static void produce() throws MQClientException { TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("TxProducer"); producer.setCheckThreadPoolMinSize(2); producer.setCheckThreadPoolMaxSize(4); producer.setCheckRequestHoldMax(2000); producer.setTransactionCheckListener(transactionCheckListener); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setInstanceName("TxProducer-instance1"); producer.setVipChannelEnabled(false); producer.start(); TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); try { for (int i = 0; i < 1000; i++) { Message msg = new Message("Topic1", "Tag1", "OrderId" + i, ("Body" + i).getBytes()); SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); System.out.println(sendResult); Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } finally { producer.shutdown(); } } }
RocketMQConsumer:atom
package aaron.mq.consumer; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; /** * Created by Aaron Sheng on 10/17/16. */ public class RocketMQConsumer { public static void consume() throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setVipChannelEnabled(false); consumer.setInstanceName("rmq-instance"); consumer.subscribe("Topic1", "Tag1"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(msg.getKeys() + " " + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }