RocketMQ核心概念

名词概念简介

消息生产者(producer):负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。

  • 选择队列策略:已发送消息数量队queue count取mod
  • 消息内容全部存储在commit log中,queue log记录每条消息再commit log中存储的位置等信息

发送结果返回值:

  • SEND_OK:消息发送成功。要注意的是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。
  • FLUSH_DISK_TIMEOUT:消息发送成功但是服务器刷盘超时。此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失。消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度,如果Broker服务器设置了刷盘方式为同步刷盘,FlushDiskType=SYNC_FLUSH(默认为异步刷盘方式),当Broker服务器未在同步刷盘时间内(默认为5s)完成刷盘,则将返回该状态——刷盘超时。
  • FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到Slave时超时。此时消息已经进入服务器队列,只有服务器宕机,消息才会丢失。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master即ASYNC_MASTER),并且从Broker服务器未在同步刷盘时间(默认为5秒)内完成与主服务器的同步,则将返回该状态——数据同步到Slave服务器超时。
  • SLAVE_NOT_AVAILABLE:消息发送成功,但是此时Slave不可用。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master服务器ASYNC_MASTER),但没有配置slaveBroker服务器,则将返回该状态——无Slave服务器可用。

消息消费者(Consumer):负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费(pull consumer)、推动式消费(push consumer)。
主题(Topic):表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
代理服务器(Broker Server):消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
名字服务(Name Server):名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。
拉取式消费(Pull Consumer):Consumer消费的一种类型,应用通常主动调Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
推动式消费(Push Consumer):Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。
生产者组(Producer Group):同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事物消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
消费者组(Consumer Group):同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
集群消费(Clustering):集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
广播消费(Broadcasting):广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
普通顺序消息(Normal Ordered Message):普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。
严格顺序消息(Strictly Ordered Message):严格顺序消息模式下,消费者收到的所有消息均是有顺序的。
消息(Message):消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。
标签(Tag):为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
全局有序:如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个。
分区有序如果多个queue参与,即相对每个queue,消息都是有序的。
延时消息:定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。
使用场景:如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
批量消息:批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应
超过4MB。rocketmq建议每次批量消息大小大概在1MB。当消息大小超过4MB时,需要将消息进行分割
过滤消息:大多数情况下,可以通过TAG来选择您想要的消息
在这里插入图片描述

事务消息:消息队列 MQ 提供类似 X/Open XA 的分布式事务功能,通过消息队
列 MQ 事务消息能达到分布式事务的最终一致。

  • 暂不能投递的消息,发送方已经成功地将消息发送到了消息队列
    MQ 服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记
    成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
  • 由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确
    认丢失,消息队列 MQ 服务端通过扫描发现某条消息长期处于“半事务消息”时,需要
    主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即
    消息回查。默认15次
  • 通过购物车进行下单的流程中,用户入口在购物车系统,交易下单入口在交易系统,两
    个系统之间的数据需要保持最终一致,这时可以通过事务消息进行处理。交易系统下单之后,发送一条交易下单的消息到消息队列 MQ,购物车系统订阅消息队列 MQ 的交易下单消息,做相应的业务处理,更新购物车数据。如果还有其他业务可以设置另一个消费组消费交易下单消息

在这里插入图片描述
消费位点
CONSUME_FROM_LAST_OFFSET 将会忽略历史消息,并消费之后生成的任何消息。
CONSUME_FROM_FIRST_OFFSET 将会消费每个存在于 Broker 中的信息。你也可以使用
CONSUME_FROM_TIMESTAMP 来消费在指定时间戳后产生的消息。

架构简介

Broker架构

Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
** Client Manager**:负责管理客户端(Producer/Consumer)和维护Consumer的 Topic订阅信息
Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同 步功能。Index Service:根据特定的Message key对投递到Broker的消息进行索引服 务,以提供消息的快速查询。

消息存储整体架构

在这里插入图片描述
CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容, 消息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始
偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始 偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一 个文件;

ConsumeQueue:保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset, 消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的 commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层 组织结构,具体存储路径为: $HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue文 件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节 的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访 问每一个条目,每个ConsumeQueue文件大小约5.72M

IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方 法。Index文件的存储位置是:KaTeX parse error: Undefined control sequence: \store at position 6: HOME \̲s̲t̲o̲r̲e̲\index{fileName},文件名fileName是以 创建时的时间戳命名的,固定的单个IndexFile文件大小:40+500W4+2000W20= 420000040个字节大小,约为400M,一个IndexFile可以保存 2000W个索引,IndexFile 的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现 为hash索引。

消息刷盘

同步刷盘:如上图所示,只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真 正返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的 保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。
异步刷盘:能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成 功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延 迟,提高了MQ的性能和吞吐量。

零拷贝刷盘

正常情况文件下载为例,服务端的主要任务是:将服务端主机磁盘中的文件不做修改地从已连 接的socket发出去。操作系统底层I/O过程如下图所示
在这里插入图片描述过程共产生了四次数据拷贝,在此过程中,我们没有对文件内容做任何修改,那么在内核空 间和用户空间来回拷贝数据无疑就是一种浪费,而零拷贝主要就是为了解决这种低效性。

零拷贝主要的任务就是避免CPU将数据从一块存储拷贝到另外一块存储,主要就是利用 各种零拷贝技术,避免让CPU做大量的数据拷贝任务,减少不必要的拷贝,或者让别的组件 来做这一类简单的数据传输任务,让CPU解脱出来专注于别的任务。这样就可以让系统资源 的利用更加有效。 原理是磁盘上的数据会通过DMA被拷贝的内核缓冲区,接着操作系统会把这段内核缓冲 区与应用程序共享,这样就不需要把内核缓冲区的内容往用户空间拷贝。应用程序再调用 write(),操作系统直接将内核缓冲区的内容拷贝到socket缓冲区中,这一切都发生在内核 态,最后,socket缓冲区再把数据发到网卡去。
在这里插入图片描述