RabbitMQ
有三种模式:单机模式、普通集群模式、镜像集群模式。
Demo
级别,一般生产不会使用。RabbitMQ
实例,每个机器启动一个。你创建的 queue
,只会放在一个 RabbitMQ
实例上,但是每个实例都同步 queue
的元数据(元数据可以认为是 queue
的一些配置信息,通过元数据,可以找到 queue
所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue
所在实例上拉取数据过来。
这种方式确实很麻烦,也不怎么好,没做到所谓的分布式,就是个普通集群。因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue
所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。
而且如果那个放 queue
的实例宕机了,会导致接下来其他实例就无法从那个实例拉取,如果你开启了消息持久化,让 RabbitMQ
落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个 queue
拉取数据。
所以这个事儿就比较尴尬了,这就没有什么所谓的高可用性,这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue
的读写操作。
RabbitMQ
的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue
,无论元数据还是 queue
里的消息都会存在于多个实例上,就是说,每个 RabbitMQ
节点都有这个 queue
的一个完整镜像,包含 queue
的全部数据的意思。然后每次你写消息到 queue
的时候,都会自动把消息同步到多个实例的 queue
上。RabbitMQ
有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue
的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。这样的话,好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue
的完整数据,别的 consumer
都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!第二,这么玩儿,不是分布式的,就没有扩展性可言了,如果某个 queue
负载很重,你加机器,新增的机器也包含了这个 queue
的所有数据,并没有办法线性扩展你的 queue
。
主要有以下场景会容易导致消息堆积:
channel.basicQos(int prefetchCount)
函数限制消费者的处理速率,从而导致队列中的消息堆积直到队列塞满为止。ACK
的情况下,如果消费端拒绝消息并且重回队列,且在一些极端时候,消费端持续拒绝消息就会发生消息堆积的问题。TTL
,如果先入队列的过期时间设置比较长,后面的消息过期时间设置比较短,则队列中会有很多死消息不能被及时地淘汰,从而导致消息的堆积。消息堆积容易造成队列满后的消息丢失,而且场景3可能还会出现重复消费的情况,不能保证消费消息幂等性
RabbitMQ
丢失消息主要分3种情况:
RabbitMQ
时丢失。RabbitMQ
收到消息后意外丢失。
如何避免生产者丢失数据
RabbitMQ
的事务机制是同步的,所以开启后吞吐量会下降,对性能会有很大影响一般不建议使用。confirm
模式),在生产者那里设置开启 confirm
模式之后,你每次写的消息都会分配一个唯一的 id
,然后如果写入了 RabbitMQ
中,RabbitMQ
会给你回传一个 ack
消息,告诉你说这个消息 ok
了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack
接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id
的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。事务机制和 confirm
机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm
机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ
接收了之后会异步回调你的一个接口通知你这个消息接收到了。
如何避免RabbitMQ丢失数据
开启持久化机制,主要有2个步骤:
queue
的时候将其设置为持久化,这样就可以保证 RabbitMQ
持久化 queue
的元数据,但是它是不会持久化 queue
里的数据的。deliveryMode
设置为 2,就是将消息设置为持久化的,此时 RabbitMQ
就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,RabbitMQ
哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue
,恢复这个 queue
里的数据。除非极其罕见的是,RabbitMQ
还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。
所以,持久化可以跟生产者那边的 confirm
机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack
了,所以哪怕是在持久化到磁盘之前,RabbitMQ
挂了,数据丢了,生产者收不到 ack
,也是可以自己重发的。
如何避免消费者丢失数据
消费端丢失消息一般是在自动 ack
的情况下,拿到消息在处理过程中发生异常导致数据丢失。所以在对消息传递可靠性要求比较高的情况下需要切换到手动 ack
的模式,只有最后消费成功了,再给RabbitMQ
确认。
对于一些涉及到钱的账务性交易,一般要保证消费的幂等性(重复多次结果一样),或者不让其重复消费。
重复消费一般是由第一次消费失败,重入队列进行再次投递消费引起的。所以可以使用自动 ack
来避免这种情况,但是这样也会容易导致消息丢失。
更好地办法就是在关键操作(不能进行重复的操作,比如扣费
)之前就手动 ack
掉,主要是避免明明已经消费成功,但在最后发生意外引起 nack/rejected
的情况。使用这种方式只能说是丢失消息的概率会比自动 ack
更小,但不能完全保证消息不丢失。所以一般我们消费前会提前入库记表以便后期对账追溯。
这边允许消息进行重复消费,但是每次消费需要保证结果一致,即涉及到状态变化的交易在交易成功后的再次消费都要求失败。
可以通过消息中带有一个 uuid
的随机字段,每次消费前需要查询判断是否已经存在或者说已经被消费。还可以通过数据库字段的唯一约束来保证重复数据不会插入多条。
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
单机吞吐量 | 万级,比 RocketMQ、Kafka 低一个数量级 | 同 ActiveMQ | 10 万级,支撑高吞吐 | 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景 |
topic 数量对吞吐量的影响 | topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic | topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源 | ||
时效性 | ms 级 | 微秒级,这是 RabbitMQ 的一大特点,延迟最低 | ms 级 | 延迟在 ms 级以内 |
可用性 | 高,基于主从架构实现高可用 | 同 ActiveMQ | 非常高,分布式架构 | 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
消息可靠性 | 有较低的概率丢失数据 | 基本不丢 | 经过参数优化配置,可以做到 0 丢失 | 同 RocketMQ |
功能支持 | MQ 领域的功能极其完备 | 基于 erlang 开发,并发能力很强,性能极好,延时很低 | MQ 功能较为完善,还是分布式的,扩展性好 | 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用 |
综上,各种对比之后,有如下建议:
一般的业务系统要引入 MQ,最早大家都用 ActiveMQ
,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以不推荐用这个;
后来大家开始用 RabbitMQ
,但是确实 erlang
语言阻止了大量的 Java
工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高;
不过现在确实越来越多的公司会去用 RocketMQ
,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险(目前 RocketMQ
已捐给 Apache
,但 GitHub
上的活跃度其实不算高)对自己公司技术实力有绝对自信的,推荐用 RocketMQ
,否则回去老老实实用 RabbitMQ
吧,人家有活跃的开源社区,绝对不会黄。
所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ
是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ
是很好的选择。
如果是大数据领域的实时计算、日志采集等场景,用 Kafka
是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。