ActiveMQ原理分析

持久化消息和非持久化消息的发送策略

消息同步发送和异步发送

         ActiveMQ支持同步、异步两种发送模式将消息发送到broker上。同步发送过程中,发送者发送一条消息会阻塞直到broker反馈一个确认消息,表示消息已经被broker处理。这个机制提供了消息的安全性保障,但是由于是阻塞的操作,会影响到客户端消息发送的性能。

       异步发送的过程中,发送者不需要等待broker提供反馈,所以性能相对较高。但是可能会出现消息丢失的情况。所 以使用异步发送的前提是在某些情况下允许出现数据丢失的情况。默认情况下,非持久化消息是异步发送的,持久化消息并且是在非事务模式下是同步发送的。但是在开启事务的情况下,消息都是异步发送。由于异步发送的效率会比同步发送性能更高。所以在发送持久化消 息的时候,尽量去开启事务会话

     除了持久化消息和非持久化消息的同步和异步特性外,还可以通过以下几种方式来设置异步发送

消息的发送原理分析图解

ProducerWindowSize的含义

producer每发送一个消息,统计一下发送的字节数,当字节数达到ProducerWindowSize值时,需要等待broker的确认,才能继续发送。代码在:ActiveMQSession的1957行

      主要用来约束在异步发送时producer端允许积压的(尚未ACK)的消息的大小,且只对异步发送有意义。每次发送消 息之后,都将会导致memoryUsage大小增加(+message.size),当broker返回producerAck时,memoryUsage尺 寸减少(producerAck.size,此size表示先前发送消息的大小)。

可以通过如下2种方式设置:
Ø 在brokerUrl中设置: "tcp://localhost:61616?jms.producerWindowSize=1048576",这种设置将会对所有的producer生效。
Ø 在destinationUri中设置: "test-queue?producer.windowSize=1048576",此参数只会对使用此Destination实例的producer失效,将会覆盖brokerUrl中的producerWindowSize值。

持久化消息和非持久化消息的存储原理

正常情况下,非持久化消息是存储在内存中的,持久化消息是存储在文件中的。能够存储的最大消息数据在 ${ActiveMQ_HOME}/conf/activemq.xml文件中的systemUsage节点

SystemUsage配置设置了一些系统内存和硬盘容量

      Ø 从上面的配置我们需要get到一个结论,当非持久化消息堆积到一定程度的时候,也就是内存超过指定的设置阀 值时,ActiveMQ会将内存中的非持久化消息写入到临时文件,以便腾出内存。但是它和持久化消息的区别是,重 启之后,持久化消息会从文件中恢复,非持久化的临时文件会直接删除

消息的持久化策略分析

       消息持久性对于可靠消息传递来说是一种比较好的方法,即时发送者和接受者不是同时在线或者消息中心在发送者 发送消息后宕机了,在消息中心重启后仍然可以将消息发送出去。消息持久性的原理很简单,就是在发送消息出去 后,消息中心首先将消息存储在本地文件、内存或者远程数据库,然后把消息发送给接受者,发送成功后再把消息 从存储中删除,失败则继续尝试。接下来我们来了解一下消息在broker上的持久化存储实现方式

持久化存储支持类型

ActiveMQ支持多种不同的持久化方式,主要有以下几种,不过,无论使用哪种持久化方式,消息的存储逻辑都是 一致的。
Ø KahaDB存储(默认存储方式) Ø JDBC存储
Ø Memory存储
Ø LevelDB存储
Ø JDBC With ActiveMQ Journal

KahaDB存储

KahaDB是目前默认的存储方式,可用于任何场景,提高了性能和恢复能力。消息存储使用一个事务日志和仅仅用一个 索引文件来存储它所有的地址。KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化。在Kaha中,数据被追加到 data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。
KahaDB的配置方式

KahaDB的存储原理

在data/kahadb这个目录下,会生成四个文件
Ø db.data 它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-*.log里面存储的消息
Ø db.redo 用来进行消息恢复
Ø db-*.log 存储消息内容。新的数据以APPEND的方式追加到日志文件末尾。属于顺序写入,因此消息存储是比较 快的。默认是32M,达到阀值会自动递增
Ø lock文件 锁,表示当前获得kahadb读写权限的broker

JDBC存储

使用JDBC持久化方式,数据库会创建3个表:activemq_msgs,activemq_acks和activemq_lock。

ACTIVEMQ_MSGS 消息表,queue和topic都存在这个表中
ACTIVEMQ_ACKS 存储持久订阅的信息和最后一个持久订阅接收的消息ID
ACTIVEMQ_LOCKS 锁表,用来确保某一时刻,只能有一个ActiveMQ broker实例来访问数据库

JDBC存储实践

     dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这 样每次启动都会去创建数据表了,一般是第一次启动的时候设置为true,之后改成false

     Mysql持久化Bean配置

添加Jar包依赖

消息消费流程图

消息重发的情况

在正常情况下,有几中情况会导致消息重新发送

Ø 在事务性会话中,没有调用session.commit确认消息或者调用

session.rollback 方法回滚消息

Ø 在非事务性会话中,ACK模式为CLIENT_ACKNOWLEDGE的情况下,没有调用 acknowledge 或者调用了 recover 方法;

       一个消息被 redelivedred 超过默认的最大重发次数(默认 6 次)时,消费端会 给 broker 发送一个”poison ack”(ActiveMQMessageConsumer#dispatch: 1460 行),表示这个消息有毒,告诉 broker 不要再发了。这个时候 broker 会 把这个消息放到 DLQ(死信队列)。


 ActiveMQ 的优缺点

      ActiveMQ 采用消息推送方式,所以最适合的场景是默认消息都可在短时间内被消费。数据量越大,查找和消费消息就越慢,消息积压程度与消息速度成反比。

缺点

      1.吞吐量低。由于 ActiveMQ 需要建立索引,导致吞吐量下降。这是无法克服 的缺点,只要使用完全符合 JMS 规范的消息中间件,就要接受这个级别的 TPS。
      2.无分片功能。这是一个功能缺失,JMS 并没有规定消息中间件的集群、分片机制。而由于 ActiveMQ 是伟企业级开发设计的消息中间件,初衷并不是为了 处理海量消息和高并发请求。如果一台服务器不能承受更多消息,则需要横向 拆分。ActiveMQ 官方不提供分片机制,需要自己实现。

适用场景

      对TPS要求比较低的系统,可以使用ActiveMQ来实现,一方面比较简单,能 够快速上手开发,另一方面可控性也比较好,还有比较好的监控机制和界面

不适用的场景

       消息量巨大的场景。ActiveMQ 不支持消息自动分片机制,如果消息量巨大, 导致一台服务器不能处理全部消息,就需要自己开发消息分片功能。