消息队列概述及ActiveMQ使用

消息中间件定义

通常认为,消息中间件属于分布式系统中一个子系统,关注于数据的发送和接收,利用高效可靠的异步消息传递机制对分布式系统中的其他各个子系统进行集成。html

为何要用消息中间件

随着系统的发展,各个模块愈来愈庞大、业务逻辑愈来愈复杂,必然要作服务化和业务拆分的,这个时候各个系统之间的交互,RPC是首选。可是随着系统的继续发展,一些功能涉及几十个服务的调用,这时候须要消息中间件来解决问题。java

消息中间件主要解决分布式系统之间消息的传递,同时为分布式系统中其余子系统提供了伸缩性和扩展性。为系统带来了:
1.低耦合,不论是程序仍是模块之间,使用消息中间件进行间接通讯。
2.异步通讯能力,使得子系统之间得以充分执行本身的逻辑而无需等待。
3.高并发能力,将高峰期大量的请求存储下来慢慢交给后台进行处理,好比适用于秒杀业务。spring

和RPC区别

RPC和消息中间件的场景的差别很大程度上在于就是“依赖性”和“同步性”:
RPC是强依赖,典型的同步方式,像本地调用。消息中间件方式属于异步方式。消息队列是系统级、模块级的通讯。RPC是对象级、函数级通讯。apache

业务上的必须环节通常用RPC,对于一些不影响流程的不是强依赖的能够考虑消息队列,如发送短信,统计数据,解耦应用。编程

消息队列应用场景

1.异步处理;2.应用解耦;3.限流;4.日志处理;5消息通信windows


经常使用的消息中间件比较

clipboard.png


JMS规范(ActiveMQ基于/实现JMS规范)

JMS规范包含如下6个要素
1.链接工厂;2.JMS链接;3.JMS会话;4.JMS目的(Broker);5.JMS生产者;6.JMS消费者数组


JMS规范的消息
JMS 消息由如下三部分组成:session

  • 消息头。每一个消息头字段都有相应的getter 和setter 方法。
  • 消息属性。若是须要除消息头字段之外的值,那么可使用消息属性。
  • 消息体。JMS 定义的消息类型有TextMessage、MapMessage、BytesMessage、StreamMessage(BIO) 和 ObjectMessage。ActiveMQ也有对应的实现。

注意1:生产者及消费者的消息类型必须一致才能接受到消息。
注意2:通常来讲不用对象消息类型,传输对象消息,对象得序列化(实现Serializable接口),JDK自己的序列化效率低,产生的字节码数组大。可把对象序列化成JSON串。
注意3:社区查阅:通过性能测试消息建议不超过1K(1024字节),大消息如大于1M的选择kafka性能好并发

JMS消息模型异步

1.Point-to-Point点对点:
生产者发布消息到队列queue上,若没有对应的消费者则消息保留;若queue上有多个消费者的时候,消息只会被一个消费者消费。

clipboard.png

2.Topic/主题(发布与订阅)(广播):
生产者发布消息到主题,主题会向全部消费者(订阅者)发送消息;若没有消费者在线,则消息丢失也就相似广播,没了就没了。

clipboard.png


ActiveMQ安装

官网http://activemq.apache.org/ac...:8161/admin为后台管理平台可查询队列状况及消息条数等等。

查看activemq.xml可查看ActiveMQ应用的缺省端口为61616,8161为管理平台端口。

ActiveMQ的使用

一:原生API编程(最灵活,重要)

消费者:看代码很明显是基于JMS规范的要素来编程的,要通讯必需要创建链接。

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsConsumer {
    /*默认链接用户名*/
    private static final String USERNAME
            = ActiveMQConnection.DEFAULT_USER;
    /* 默认链接密码*/
    private static final String PASSWORD
            = ActiveMQConnection.DEFAULT_PASSWORD;
    /* 默认链接地址*/
    private static final String BROKEURL
            = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        /* 链接工厂*/
        ConnectionFactory connectionFactory;
        /* 链接*/
        Connection connection = null;
        /* 会话*/
        Session session;
        /* 消息的目的地*/
        Destination destination;
        /* 消息的消费者*/
        MessageConsumer messageConsumer;

        /* 实例化链接工厂*/
        connectionFactory
                = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);

        try {
            /* 经过链接工厂获取链接*/
            connection = connectionFactory.createConnection();
            /* 启动链接*/
            connection.start();
            /* 建立session*/
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            /* 建立一个名为HelloWorldQueue消息队列*/
            //destination = session.createTopic("HelloWorldTopic");
            destination = session.createQueue("HelloWorldQueue");
            /* 建立消息消费者*/
            messageConsumer = session.createConsumer(destination);
            Message message;
            while((message = messageConsumer.receive())!=null){
                System.out.println("收到消息"+((TextMessage)message).getText());
            }

        } catch (JMSException e) {
            e.printStackTrace();
        }finally {
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}

生产者:一样的生产者也是基于JMS规范要素。

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsProducer {

    /*默认链接用户名*/
    private static final String USERNAME
            = ActiveMQConnection.DEFAULT_USER;
    /* 默认链接密码*/
    private static final String PASSWORD
            = ActiveMQConnection.DEFAULT_PASSWORD;
    /* 默认链接地址*/
    private static final String BROKEURL
            = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static final int SENDNUM = 5;

    public static void main(String[] args) {
        /* 链接工厂*/
        ConnectionFactory connectionFactory;
        /* 链接*/
        Connection connection = null;
        /* 会话*/
        Session session;
        /* 消息的目的地*/
        Destination destination;
        /* 消息的生产者*/
        MessageProducer messageProducer;

        /* 实例化链接工厂*/
        connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,
                BROKEURL);
        try {
            /* 经过链接工厂获取链接*/
            connection = connectionFactory.createConnection();
            /* 启动链接*/
            connection.start();
            /* 建立session
            * 第一个参数表示是否使用事务,第二次参数表示是否自动确认*/
            session = connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            /* 建立一个名为HelloWorldQueue消息队列*/
            //destination = session.createTopic("HelloWorldTopic");
            destination = session.createQueue("HelloWorldQueue");
            /* 建立消息生产者*/
            messageProducer = session.createProducer(destination);
            /* 循环发送消息*/
            for(int i=0;i<SENDNUM;i++){
                String msg = "发送消息"+i+" "+System.currentTimeMillis();
                TextMessage textMessage = session.createTextMessage(msg);
                System.out.println("标准用法:"+msg);
                messageProducer.send(textMessage);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

        }
    }
}

先启动ActiveMQ再执行代码demo就能够了解对应的特性,如上代码为点对点模型,无论消费者在生产者先后启动的都能接受到消息,毕竟生产者发布的消息无对应的消费者消费时,队列会保存消息。
一样的也能够测试下主题模式,如上放开注释便可。


二:Spring整合

生产者配置:

<!-- ActiveMQ 链接工厂 -->
<amq:connectionFactory id="amqConnectionFactory"
         brokerURL="tcp://127.0.0.1:61616" userName="" password="" />

<!-- Spring Caching链接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
      class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
    <property name="sessionCacheSize" value="100"></property>
</bean>


<!-- Spring JmsTemplate 的消息生产者 start-->
<!-- 定义JmsTemplate的Queue类型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
    <constructor-arg ref="connectionFactory"></constructor-arg>
    <!-- 队列模式-->
    <property name="pubSubDomain" value="false"></property>
</bean>

<!-- 定义JmsTemplate的Topic类型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
    <constructor-arg ref="connectionFactory"></constructor-arg>
    <!-- 发布订阅模式-->
    <property name="pubSubDomain" value="true"></property>
</bean>

<!--Spring JmsTemplate 的消息生产者 end-->

Queue生产者:直接注入队列模式的bean便可使用

@Component
public class QueueSender {

    @Autowired
    @Qualifier("jmsQueueTemplate")
    private JmsTemplate jmsTemplate;

    public void send(String queueName,final String message){
        jmsTemplate.send(queueName, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                Message msg = session.createTextMessage(message);
                //TODO  应答
                return msg;
            }
        }); 
        
    }
      
}

Topic生产者:

@Component
public class TopicSender {

    @Autowired
    @Qualifier("jmsTopicTemplate")
    private JmsTemplate jmsTemplate;

    public void send(String queueName,final String message){
        jmsTemplate.send(queueName, new MessageCreator() {

            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage(message);
                return textMessage;
            }
        });
    }
}

消费者配置:

<!-- ActiveMQ 链接工厂 -->
<amq:connectionFactory id="amqConnectionFactory"
         brokerURL="tcp://127.0.0.1:61616" userName="" password="" />

<!-- Spring Caching链接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
      class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
    <property name="sessionCacheSize" value="100"></property>
</bean>


<!-- 消息消费者 start-->

<!-- 定义Topic监听器 -->
<jms:listener-container destination-type="topic" container-type="default"
                        connection-factory="connectionFactory" acknowledge="auto">
    <jms:listener destination="test.topic" ref="topicReceiver1"></jms:listener>
    <jms:listener destination="test.topic" ref="topicReceiver2"></jms:listener>
</jms:listener-container>

<!-- 定义Queue监听器 -->
<jms:listener-container destination-type="queue" container-type="default"
                        connection-factory="connectionFactory" acknowledge="auto">
    <jms:listener destination="test.queue" ref="queueReceiver1"></jms:listener>
    <jms:listener destination="test.queue" ref="queueReceiver2"></jms:listener>
</jms:listener-container>
<!-- 消息消费者 end -->

队列消费者:

@Component
public class QueueReceiver1 implements MessageListener {

    public void onMessage(Message message) {
        try {
            String textMsg = ((TextMessage)message).getText();
            System.out.println("QueueReceiver1 accept msg : "+textMsg);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

主题消费者:

@Component
public class TopicReceiver1 implements MessageListener {
    public void onMessage(Message message) {
        try {
            System.out.println(((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

SpringBoot整合可在官网查询对应配置