在分布式系统中,咱们时常会遇到分布式事务的问题,除了常规的解决方案以外,咱们还能够利用RocketMQ的事务性消息来解决分布式事务的问题。RocketMQ和其余消息中间件最大的一个区别是支持了事务消息,这也是分布式事务里面的基于消息的最终一致性方案。html
这里可能会存在一个问题,生产者本地事务成功后,发送事务确认消息到broker上失败了怎么办?这个时候意味着消费者没法正常消费到这个消息。因此RocketMQ提供了消息回查机制,若是事务消息一直处于中间状态,broker会发起重试去查询broker上这个事务的处理状态。一旦发现事务处理成功,则把当前这条消息设置为可见。mysql
生产者producer:sql
public class TransactionProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, InterruptedException { TransactionMQProducer transactionMQProducer=new TransactionMQProducer("tx_producer"); transactionMQProducer.setNamesrvAddr("192.168.1.101:9876;192.168.1.102:9876"); ExecutorService executorService= Executors.newFixedThreadPool(10); transactionMQProducer.setExecutorService(executorService); transactionMQProducer.setTransactionListener(new TransactionListenerLocal()); //本地事务的监听 transactionMQProducer.start(); for(int i=0;i<10;i++){ String orderId= UUID.randomUUID().toString(); String body="{'operation':'doOrder','orderId':'"+orderId+"'}"; Message message=new Message("testTopic2", null,orderId,body.getBytes(RemotingHelper.DEFAULT_CHARSET)); transactionMQProducer.sendMessageInTransaction(message,orderId); Thread.sleep(1000); } } }
TransactionListenerLocal:数据库
public class TransactionListenerLocal implements TransactionListener { private Map<String,Boolean> results=new ConcurrentHashMap<>(); //执行本地事务 @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { System.out.println("开始执行本地事务:"+o.toString()); //o String orderId=o.toString(); //模拟数据库保存(成功/失败) boolean result=Math.abs(Objects.hash(orderId))%2==0; if(!result) { results.put(orderId, result); // } return result? LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW; } //提供给事务执行状态检查的回调方法,给broker用的(异步回调) //若是回查失败,消息就丢弃 @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { String orderId=messageExt.getKeys(); System.out.println("执行事务回调检查: orderId:"+orderId); if(results.size()==0){ return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; } }
消费端 consumer:架构
public class TransactionConsumer { //rocketMQ 除了在同一个组和不一样组之间的消费者的特性和kafka相同以外 //RocketMQ能够支持广播消息,就意味着,同一个group的每一个消费者均可以消费同一个消息 public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer defaultMQPushConsumer= new DefaultMQPushConsumer("tx_consumer"); defaultMQPushConsumer.setNamesrvAddr("192.168.1.101:9876;192.168.1.102:9876"); defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //subExpression 能够支持sql的表达式. or and a=? ,,, defaultMQPushConsumer.subscribe("testTopic2","*"); defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { list.stream().forEach(message->{ System.out.println("开始业务处理逻辑:消息体:"+new String(message.getBody())+"->key:"+message.getKeys()); }); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //签收 } }); defaultMQPushConsumer.start(); } }
RocketMQ事务消息的三种状态:dom
当executeLocalTransaction方法返回ROLLBACK_MESSAGE时,表示直接回滚事务,当返回COMMIT_MESSAGE提交事务当返回UNKNOW时,Broker会在一段时间以后回查checkLocalTransaction,根据checkLocalTransaction返回状态执行事务的操做(回滚或提交),如示例中,当返回ROLLBACK_MESSAGE时消费者不会收到消息,且不会调用回查函数,当返回COMMIT_MESSAGE时事务提交,消费者收到消息,当返回UNKNOW时,在一段时间以后调用回查函数,并根据status判断返回提交或回滚状态,返回提交状态的消息将会被消费者消费,因此此时消费者能够消费部分消息异步
因为分布式消息队列对于可靠性的要求比较高,因此须要保证生产者将消息发送到broker以后,保证消息是不出现丢失的,所以消息队列就少不了对于可靠性存储的要求分布式
从主流的几种MQ消息队列采用的存储方式来看,主要会有三种ide
消息的存储结构:函数
RocketMQ就是采用文件系统的方式来存储消息,消息的存储是由ConsumeQueue和CommitLog配合完成的。CommitLog是消息真正的物理存储文件。ConsumeQueue是消息的逻辑队列,有点相似于数据库的索引文件,里面存储的是指向CommitLog文件中消息存储的地址。每一个Topic下的每一个Message Queue都会对应一个ConsumeQueue文件,文件的地址是:${store_home}/consumequeue/${topicNmae}/${queueId}/${filename}, 默认路径: /root/store在rocketMQ的文件存储目录下,能够看到这样一个结构的的而文件。
CommitLog:
CommitLog是用来存放消息的物理文件,每一个broker上的commitLog本当前机器上的全部consumerQueue共享,不作任何的区分。CommitLog中的文件默认大小为1G,能够动态配置; 当一个文件写满之后,会生成一个新的commitlog文件。全部的Topic数据是顺序写入在CommitLog文件中的。文件名的长度为20位,左边补0,剩余未起始偏移量,好比00000000000000000000 表示第一个文件, 文件大小为102410241024,当第一个文件写满以后,生成第二个文件000000000001073741824 表示第二个文件,起始偏移量为1073741824。
ConsumeQueue:
consumeQueue表示消息消费的逻辑队列,这里面包含MessageQueue在commitlog中的其实物理位置偏移量offset,消息实体内容的大小和Message Tag的hash值。对于实际物理存储来讲,consumeQueue对应每一个topic和queueid下的文件,每一个consumeQueue类型的文件也是有大小,每一个文件默认大小约为600W个字节,若是文件满了后会也会生成一个新的文件。
IndexFile:
索引文件,若是一个消息包含Key值的话,会使用IndexFile存储消息索引。Index索引文件提供了对CommitLog进行数据检索,提供了一种经过key或者时间区间来查找CommitLog中的消息的方法。在物理存储中,文件名是以建立的时间戳明明,固定的单个IndexFile大小大概为400M,一个IndexFile能够保存2000W个索引。
abort:
broker在启动的时候会建立一个空的名为abort的文件,并在shutdown时将其删除,用于标识进程是否正常退出,若是不正常退出,会在启动时作故障恢复。
Config:
能够看到这个里面保存了 消费端consumer的偏移量:
以及topic的一些配置信息:
RocketMQ的消息存储采用的是混合型的存储结构,也就是Broker单个实例下的全部队列公用一个日志数据文件CommitLog。这个是和Kafka又一个不一样之处。为何不采用kafka的设计,针对不一样的partition存储一个独立的物理文件呢?这是由于在kafka的设计中,一旦kafka中Topic的Partition数量过多,队列文件会过多,那么会给磁盘的IO读写形成比较大的压力,也就形成了性能瓶颈。因此RocketMQ进行了优化,消息主题统一存储在CommitLog中。固然它也有它的优缺点
1. Producer将消息发送到Broker后,Broker会采用同步或者异步的方式把消息写入到CommitLog。RocketMQ全部的消息都会存放在CommitLog中,为了保证消息存储不发生混乱,对CommitLog写以前会加锁,同时也可使得消息可以被顺序写入到CommitLog,只要消息被持久化到磁盘文件CommitLog,那么就能够保证Producer发送的消息不会丢失。
2. commitLog持久化后,会把里面的消息Dispatch到对应的Consume Queue上,Consume Queue至关于kafka中的partition,是一个逻辑队列,存储了这个Queue在CommiLog中的起始offset,log大小和MessageTag的hashCode。
3. 当消费者进行消息消费时,会先读取consumerQueue , 逻辑消费队列ConsumeQueue保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量Offset,消息大小、和消息Tag的HashCode值
4. 直接从consumequeue中读取消息是没有数据的,真正的消息主体在commitlog中,因此还须要从commitlog中读取消息
何时清理物理消息文件?那消息文件到底删不删,何时删?
消息存储在CommitLog以后,的确是会被清理的,可是这个清理只会在如下任一条件成立才会批量删除消息文件(CommitLog):
注:若磁盘空间达到危险水位线(默认90%),出于保护自身的目的,broker会拒绝写入服务。