RocketMQ 事务消息

    RocketMQ将事务拆分红小事务异步执行的方式来执行。
    RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段经过第一阶段拿到的地址去访问消息,并修改状态。RocketMQ会按期扫描消息集群中的事物消息,这时候发现了Prepared消息,它会向消息发送者确认,RocketMQ会根据发送端设置的策略来决定是回滚仍是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
RocketMQ事务消息:


TransactionCheckListenerImpl:java

package aaron.mq.producer;

import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
import com.alibaba.rocketmq.common.message.MessageExt;

/**
 * Created by Aaron Sheng on 10/19/16.
 * TransactionCheckListenerImpl handle transaction unsettled.
 * Broker will notify producer to check local transaction.
 */
public class TransactionCheckListenerImpl implements TransactionCheckListener {

    @Override
    public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) {
        System.out.println("checkLocalTransactionState");
        System.out.println("topic: " + messageExt.getTopic());
        System.out.println("body: " + messageExt.getBody());

        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
}


TransactionExecuterImpl:异步

package aaron.mq.producer;

import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.common.message.Message;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by Aaron Sheng on 10/19/16.
 * TransactionExecuterImpl executre local trancation and return result to broker.
 */
public class TransactionExecuterImpl implements LocalTransactionExecuter {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    @Override
    public LocalTransactionState executeLocalTransactionBranch(Message message, Object o) {
        System.out.println("executeLocalTransactionBranch " + message.toString());

        int value = transactionIndex.getAndIncrement();
        if ((value % 3) == 0) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else if ((value % 3) == 1) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        } else{
            return LocalTransactionState.UNKNOW;
        }
    }
}


TransactionProducer:ide

package aaron.mq.producer;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
import com.alibaba.rocketmq.client.producer.TransactionMQProducer;
import com.alibaba.rocketmq.common.message.Message;

/**
 * Created by Aaron Sheng on 10/19/16.
 */
public class TransactionProducer {
    public static void produce() throws MQClientException {
        TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("TxProducer");
        producer.setCheckThreadPoolMinSize(2);
        producer.setCheckThreadPoolMaxSize(4);
        producer.setCheckRequestHoldMax(2000);
        producer.setTransactionCheckListener(transactionCheckListener);
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setInstanceName("TxProducer-instance1");
        producer.setVipChannelEnabled(false);
        producer.start();

        TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
        try {
            for (int i = 0; i < 1000; i++) {
                Message msg = new Message("Topic1",
                        "Tag1",
                        "OrderId" + i,
                        ("Body" + i).getBytes());
                SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
                System.out.println(sendResult);
                Thread.sleep(1000);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.shutdown();
        }
    }
}


RocketMQConsumer:atom

package aaron.mq.consumer;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * Created by Aaron Sheng on 10/17/16.
 */
public class RocketMQConsumer {
    public static void consume() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setVipChannelEnabled(false);
        consumer.setInstanceName("rmq-instance");
        consumer.subscribe("Topic1", "Tag1");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(msg.getKeys() + " " + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}