activeMQ总结

1. 消息中间件

平时工作中一直使用到activeMQ,但是一直没有机会进行总结,现在由于公司业务需求,需要将消息中间键由actveMQ变成RocketMQ,现在对消息中间键进行简单的整理.

2. 什么是MOM(面向消息中间键)?

MOM(Message-oriented middleware)就是面向消息中间件,是用于以分布式应用或系统中的异步,松耦合,可靠,可扩展和安全通信的一类软件.MOM总体思想是它作为消息发送器和消息接收器之间的消息中介,这种中介提供了一个全新水平的松耦合.

目前在生产环境常用的消息队列有:
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。常用对比如下:

对比

3. 什么是activeMQ?

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范(JMS规范)的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

4. activeMQ 两种模式.

  • 点对点(P2P) : 即每产生一个消息并发送,只有一个消息消费者和其一一对应.

    点对点模式主要建立在一个队列上边,当连接到一个队列的时候,发送端不需要知道接收端是否正在接收,可以直接向ActiveMQ发送消息,发送的消息首先进入队列中,如果有接收端在监听,则会发送给接收端,否则保存在activeMQ服务器,知道发现接收端接收消息.点对点可以有多个发送端和多个接收端,但是一个消息只能被一个接收端消费.

  • 发布订阅(Pub/Sub): 即每产生一个消息并发送,可以有多个消费者对消息进行接收和消费.

发布/订阅模式,同样可以有多个发送端和多个接收端,但是接收端如果没有监听消息,那么activeMQ默认不会保存信息,将会认为消息已经发送出去了. 发送端发送的消息如果接收端不在线,收不到消息,即使以后连接上线也不会接收到已经发送的消息, 发布订阅模式中,activeMQ不对保存消息,如果需要保存消息要进一步进行设置.

5.activeMQ使用.

  • 生产者:发送消息,发送端

第一步: 创建ConnectionFactory,需要指定服务端ip以及端口号.

第二步:使用ConncetionFactory对象创建一个Connection对象.

第三步:开启连接,调用activeMQ的start方法.

第四步:使用Connection对象创建一个session对象.

第五步:使用session创建一个Destination对象(topic,queue),此外创建一个Queue对象.

第六步: 使用Session 创建一个Producer对象.

第七步:创建一个Message对象,创建一个TextMessage对象.

第八步: 使用Productor对象发送消息.

第九步: 关闭资源.

public void testQueueProducer() throws Exception {

// 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。

//brokerURL服务器的ip及端口号

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");

// 第二步:使用ConnectionFactory对象创建一个Connection对象。

Connection connection = connectionFactory.createConnection();

// 第三步:开启连接,调用Connection对象的start方法。

connection.start();

// 第四步:使用Connection对象创建一个Session对象。

//第一个参数:是否开启事务。true:开启事务,第二个参数忽略。

//第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。

//参数:队列的名称。

Queue queue = session.createQueue("test-queue");

// 第六步:使用Session对象创建一个Producer对象。

MessageProducer producer = session.createProducer(queue);

// 第七步:创建一个Message对象,创建一个TextMessage对象。

/*TextMessage message = new ActiveMQTextMessage(); message.setText("hello activeMq,this is my first test.");*/

TextMessage textMessage = session.createTextMessage("hello activeMq,我要发送第一个信息了......");

// 第八步:使用Producer对象发送消息。

producer.send(textMessage);

// 第九步:关闭资源。

producer.close();

session.close();

connection.close();

}
  • 消费者:接收消息,消费消息

第一步:创建一个ConncetionFactory对象.

第二步:Connection 对象创建一个Connection对象.

第三步:Connection对象调用start方法开启连接.

第四步:使用 Connection对象创建一个Session对象.

第五步: 使用Connection对象创建一个Destination对象,和发送端保持一致,其中队列Queue的名字也要保持一致.

第六步:使用session创建一个Consumer对象.

第七步:接收消息.

第八步:打印消息.

第九步:关闭资源.

public void testQueueConsumer() throws Exception {

// 第一步:创建一个ConnectionFactory对象。

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");

// 第二步:从ConnectionFactory对象中获得一个Connection对象。

Connection connection = connectionFactory.createConnection();

// 第三步:开启连接。调用Connection对象的start方法。

connection.start();

// 第四步:使用Connection对象创建一个Session对象。

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。

Queue queue = session.createQueue("test-queue");

// 第六步:使用Session对象创建一个Consumer对象。

MessageConsumer consumer = session.createConsumer(queue);

// 第七步:接收消息。

consumer.setMessageListener(new MessageListener() {

@Override

public void onMessage(Message message) {

try {

TextMessage textMessage = (TextMessage) message;

String text = null;

//取消息的内容

text = textMessage.getText();

// 第八步:打印消息。

System.out.println(text);

} catch (JMSException e) {

e.printStackTrace();

}

}

});

//等待键盘输入
//这里因为是DEMO的原因,所以需要在这里等待键盘输入,也就是进行阻塞,但是在spring框架中使用activeMQ,因为有框架持有.
System.in.read();

// 第九步:关闭资源

consumer.close();

session.close();

connection.close();

}

消息中间件的优缺点总结:

  1. ActiveMQ:
    非常成熟,功能强大,在业内大量的公司以及项目中都有应用
    偶尔会有较低概率丢失消息而且现在社区以及国内应用都越来越少,官方社区现在对ActiveMQ 5.x维护越来越少,几个月才发布一个版本而且确实主要是基于解耦和异步来用的,较少在大规模吞吐的场景中使用.

  2. RabbitMQ:
    erlang语言开发,性能极其好,延时很低;
    吞吐量到万级,MQ功能比较完备
    而且开源提供的管理界面非常棒,用起来很好用
    社区相对比较活跃,几乎每个月都发布几个版本分
    在国内一些互联网公司近几年用rabbitmq也比较多一些
    但是问题也是显而易见的,RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。

优点:

  • 在高吞吐,高可用上较弱.
  • 支持多种客户端语言.
  • 由于erlang语言的特性,性能也比较好,使用RAM模式时,性能很好.
  • 管理界面丰富,在互联网公司有大规模应用.

缺点:

  • erlang语言难度较大,集群不支持动态扩展.
  • 不支持事务,消息吞吐能力有限.
  • 消息堆积是,性能明显降低.
  1. RocketMQ:
    接口简单易用,而且毕竟在阿里大规模应用过,有阿里品牌保障
    日处理消息上百亿之多,可以做到大规模吞吐,性能也非常好,分布式扩展也很方便,社区维护还可以,可靠性和可用性都是ok的,还可以支撑大规模的topic数量,支持复杂MQ业务场景而且一个很大的优势在于,阿里出品都是java系的,我们可以自己阅读源码,定制自己公司的MQ,可以掌控.

优点:

  • 在高吞吐量,低延迟,高可用上有非常好的表现,消息堆积时性能也很好.
  • api,系统设计都更适合在业务处理场景.
  • 支持多种消费方式.
  • 支持broker消息过滤.
  • 支持事物.
  • 提供消息顺序消费能力:consumer可以水平扩展,消费能力强.
  • 集群模式规模在50台左右,单日处理消息上百亿.

缺点:

  • 相比于kafka,使用者少,生态不够完善.
  • 不支持主从自动切换.
  • 客户端只支持java.
  1. Kafka:
    kafka的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展
    同时kafka最好是支撑较少的topic数量即可,保证其超高吞吐量而且kafka唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略.
    这个特性天然适合大数据实时计算以及日志收集.

优点:

  • 在高吞吐,低延迟,高可用集群热扩展集群容错上有非常好的表现.
  • producer端提供缓存,压缩功能,节省性能,提高效率.
  • 提供顺序消费能力.
  • 提供多种客户端语言.
  • 生态完善.在大数据处理有噢诶套设施.
    缺点:
  • 消费集群数目受到分区数目的限制.
  • 单机topic多时,性能明显降低.
  • 不支持事务.