经过上一篇文章 《消息队列深刻解析》,咱们已经消息队列是什么、使用消息队列的好处以及常见消息队列的简单介绍。java
这一篇文章,主要带你们详细了解一下消息队列ActiveMQ的使用。面试
学习消息队列ActiveMQ的使用以前,咱们先来搞清JMS。spring
JMS(JAVA Message Service,java消息服务)是java的消息服务,JMS的客户端之间能够经过JMS服务进行异步的消息传输。JMS(JAVA Message Service,java消息服务)API是一个消息服务的标准或者说是规范,容许应用程序组件基于JavaEE平台建立、发送、接收和读取消息。它使分布式通讯耦合度更低,消息服务更加可靠以及异步性。apache
JMS定义了五种不一样的消息正文格式,以及调用的消息类型,容许你发送并接收以一些不一样形式的数据,提供现有消息格式的一些级别的兼容性。服务器
使用队列(Queue)做为消息通讯载体;知足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。好比:咱们生产者发送100条消息的话,两个消费者来消费通常状况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)后面咱们会经过代码演示来验证。微信
发布订阅模型(Pub/Sub) 使用主题(Topic)做为消息通讯载体,相似于广播模式;发布者发布一条消息,该消息经过主题传递给全部的订阅者,在一条消息广播以后才订阅的用户则是收不到该条消息的。session
参考:https://blog.csdn.net/shaobin...app
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个彻底支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已是好久的事情了,可是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
安装过程很简单这里就不贴安装过程了,能够自行google.异步
添加Maven依赖socket
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.3</version> </dependency>
生产者发送消息测试方法:
@Test public void testQueueProducer() throws Exception { // 一、建立一个链接工厂对象,须要指定服务的ip及端口。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616"); // 二、使用工厂对象建立一个Connection对象。 Connection connection = connectionFactory.createConnection(); // 三、开启链接,调用Connection对象的start方法。 connection.start(); // 四、建立一个Session对象。 // 第一个参数:是否开启事务。若是true开启事务,第二个参数无心义。通常不开启事务false。 // 第二个参数:应答模式。自动应答或者手动应答。通常自动应答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 五、使用Session对象建立一个Destination对象。两种形式queue、topic,如今应该使用queue Queue queue = session.createQueue("test-queue"); // 六、使用Session对象建立一个Producer对象。 MessageProducer producer = session.createProducer(queue); // 七、建立一个Message对象,可使用TextMessage。 for (int i = 0; i < 50; i++) { TextMessage textMessage = session.createTextMessage("第"+i+ "一个ActiveMQ队列目的地的消息"); // 八、发送消息 producer.send(textMessage); } // 九、关闭资源 producer.close(); session.close(); connection.close(); }
消费者消费消息测试方法
@Test public void testQueueConsumer() throws Exception { // 建立一个ConnectionFactory对象链接MQ服务器 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616"); // 建立一个链接对象 Connection connection = connectionFactory.createConnection(); // 开启链接 connection.start(); // 使用Connection对象建立一个Session对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 建立一个Destination对象。queue对象 Queue queue = session.createQueue("test-queue"); // 使用Session对象建立一个消费者对象。 MessageConsumer consumer = session.createConsumer(queue); // 接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { // 打印结果 TextMessage textMessage = (TextMessage) message; String text; try { text = textMessage.getText(); System.out.println("这是接收到的消息:" + text); } catch (JMSException e) { e.printStackTrace(); } } }); // 等待接收消息 System.in.read(); // 关闭资源 consumer.close(); session.close(); connection.close(); }
咱们开启两个消费者进程来监听(运行两次testQueueConsumer()方法)。
而后咱们运行运行生产者测试方法发送消息.先发送消息仍是先监听消息通常不会不影响。
效果以下:
两个消费者各自消费一半消息,并且仍是按照消息发送到消息队列的顺序,这也验证了咱们上面的说法。
第一个消费者
第二个消费者
生产者发送消息测试方法:
@Test public void testTopicProducer() throws Exception { // 一、建立一个链接工厂对象,须要指定服务的ip及端口。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616"); // 二、使用工厂对象建立一个Connection对象。 Connection connection = connectionFactory.createConnection(); // 三、开启链接,调用Connection对象的start方法。 connection.start(); // 四、建立一个Session对象。 // 第一个参数:是否开启事务。若是true开启事务,第二个参数无心义。通常不开启事务false。 // 第二个参数:应答模式。自动应答或者手动应答。通常自动应答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 五、使用Session对象建立一个Destination对象。两种形式queue、topic,如今应该使用topic Topic topic = session.createTopic("test-topic"); // 六、使用Session对象建立一个Producer对象。 MessageProducer producer = session.createProducer(topic); // 七、建立一个Message对象,可使用TextMessage。 for (int i = 0; i < 50; i++) { TextMessage textMessage = session.createTextMessage("第"+i+ "一个ActiveMQ队列目的地的消息"); // 八、发送消息 producer.send(textMessage); } // 九、关闭资源 producer.close(); session.close(); connection.close(); }
消费者消费消息测试方法:
@Test public void testTopicConsumer() throws Exception { // 建立一个ConnectionFactory对象链接MQ服务器 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616"); // 建立一个链接对象 Connection connection = connectionFactory.createConnection(); // 开启链接 connection.start(); // 使用Connection对象建立一个Session对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 建立一个Destination对象。topic对象 Topic topic = session.createTopic("test-topic"); // 使用Session对象建立一个消费者对象。 MessageConsumer consumer = session.createConsumer(topic); // 接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { // 打印结果 TextMessage textMessage = (TextMessage) message; String text; try { text = textMessage.getText(); System.out.println("这是接收到的消息:" + text); } catch (JMSException e) { e.printStackTrace(); } } }); System.out.println("topic消费者启动。。。。"); // 等待接收消息 System.in.read(); // 关闭资源 consumer.close(); session.close(); connection.close(); }
先运行两个消费者进程(提早订阅,否则收不到发送的消息),而后运行生产者测试方法发送消息。
结果是:
两个消费者进程均可以接收到生产者发送过来的全部消息,我这里就不贴图片了,
这样验证了咱们上面的说法。
咱们从上面代码就能够看出,点对点通讯和发布订阅通讯模式的区别就是建立生产者和消费者对象时提供的Destination对象不一样,若是是点对点通讯建立的Destination对象是Queue,发布订阅通讯模式通讯则是Topic。
整合spring除了咱们上面依赖的Jar包还要依赖
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.2.7.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>4.2.7.RELEASE</version> </dependency>
好比咱们在咱们的系统中如今有两个服务,第一个服务发送消息,第二个服务接收消息,咱们下面看看这是如何实现的。
发送消息的配置文件:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd"> <!-- 真正能够产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.155:61616" /> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的能够产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!-- 配置生产者 --> <!-- Spring提供的JMS工具类,它能够进行消息发送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是咱们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="connectionFactory" /> </bean> <!--这个是队列目的地,点对点的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>spring-queue</value> </constructor-arg> </bean> <!--这个是主题目的地,一对多的 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic" /> </bean> </beans>
发送消息的测试方法:
@Test public void testSpringActiveMq() throws Exception { //初始化spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); //从spring容器中得到JmsTemplate对象 JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class); //从spring容器中取Destination对象 Destination destination = (Destination) applicationContext.getBean("queueDestination"); //使用JmsTemplate对象发送消息。 jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { //建立一个消息对象并返回 TextMessage textMessage = session.createTextMessage("spring activemq queue message"); return textMessage; } }); }
咱们上面直接ApplicationContext的getBean方法获取的对象,实际在项目使用依赖注入便可。
建立一个MessageListener的实现类。
public class MyMessageListener implements MessageListener { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; //取消息内容 String text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }
接收消息的配置文件:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd"> <!-- 真正能够产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.168:61616" /> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的能够产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!--这个是队列目的地,点对点的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>spring-queue</value> </constructor-arg> </bean> <!--这个是主题目的地,一对多的 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic" /> </bean> <!-- 接收消息 --> <!-- 配置监听器 --> <bean id="myMessageListener" class="cn.e3mall.search.listener.MyMessageListener" /> <!-- 消息监听容器 --> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="myMessageListener" /> </bean> </beans>
测试接收消息的代码
@Test public void testQueueConsumer() throws Exception { //初始化spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); //等待 System.in.read(); }
欢迎关注个人微信公众号:"Java面试通关手册"(一个有温度的微信公众号,期待与你共同进步~~~坚持原创,分享美文,分享各类Java学习资源):。