利其器-ActiveMQ

简介
消息队列,用于系统之间通信。
与dubbo+zookeeper 在不同层之间调用,ActiveMQ 可以解决同层之间的通信。
dubbo+zookeeper 有启动顺序,如果服务提供者没有启动,先启动消费者,会报错。所以同层之间如果来回引用,就不知道怎么启动了。
ActiveMQ可以实现,数据同步。如果新添加的数据,可以使用ActiveMQ 通知其他用到这些数据服务且存到缓存中的,可以更新缓存了(solr搜索引擎,redis 缓存)。
安装
运行:/bin activemq
用户名:admin
密码:admin


消息传递
对于消息的传递有两种类型:
一种是点对点的 ,即一个生产者和一个消费者一一对应;
另一种是发布 / 订阅模式 ,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
格式
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
  · StreamMessage -- Java原始值的数据流
  · MapMessage--一套名称-值对
  · TextMessage--一个字符串对象
  · ObjectMessage--一个序列化的 Java对象
  · BytesMessage--一个字节的数据流
测试
点对点
发送方
public void testQueueProducer() throws Exception {
//1、创建一个连接工厂对象,需要指定服务的ip及端口。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
//2、使用工厂对象创建一个Connection对象。
Connection connection = connectionFactory.createConnection();
//3、开启连接,调用Connection对象的start方法。
connection.start();
//4、创建一个Session对象。
//第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。
//第二个参数:应答模式。自动应答或者手动应答。一般自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用queue
Queue queue = session.createQueue("springtest_queue");
//6、使用Session对象创建一个Producer对象。
MessageProducer producer = session.createProducer(queue);
//7、创建一个Message对象,可以使用TextMessage。
/*TextMessage textMessage = new ActiveMQTextMessage();
textMessage.setText("hello Activemq");*/
TextMessage textMessage = session.createTextMessage("Hello AcitveMQ!");
//8、发送消息
producer.send(textMessage);
//9、关闭资源
producer.close();
session.close();
connection.close();
}
接收方 开一个socket 监听这个接口
public void testQueueConsumer() throws Exception {
//创建一个ConnectionFactory对象连接MQ服务器
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
//创建一个连接对象
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//使用Connection对象创建一个Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建一个Destination对象。queue对象
Queue queue = session.createQueue("springtest_queue");
//使用Session对象创建一个消费者对象。
MessageConsumer consumer = session.createConsumer(queue);
//接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//打印结果
TextMessage textMessage = (TextMessage) message;
String text;
try {
text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//等待接收消息
System.in.read();
//关闭资源
consumer.close();
session.close();
connection.close();
}
订阅者模式
发送者 唯一改变的是
Topic topic = session.createTopic("");
接收者
Topic topic = session.createTopic("springtestTopic");
两者的不同是
点对点 会将消息存储
订阅者 不管你接收没接受到
spring+ActiveMQ
步骤
添加依赖包
< dependency >
< groupId > org.springframework </ groupId >
< artifactId > spring- jms </ artifactId >
</ dependency >
< dependency >
< groupId > org.springframework </ groupId >
< artifactId > spring-context-support </ artifactId >
</ dependency >

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
发送方
spring 中配置
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 配置生产者 -->
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!--这个是队列目的地,点对点的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue-test-test</value>
</constructor-arg>
</bean>
<!--这个是主题目的地,一对多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="itemAddTopictest" />
</bean>
发送方通过 jmstemple 发送 queue 或者 topic
接收方
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 配置生产者 -->
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!--这个是队列目的地,点对点的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!--这个是主题目的地,一对多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="itemAddTopictest" />
</bean>
<!-- 配置监听器 -->
<bean id="myMessageListener" class="com.v5.springtest.message.messageListener" />
<!-- 消息监听容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="topicDestination" />
<property name="messageListener" ref="myMessageListener" />
</bean>

配置一个spring实现好的调用监听器类DefaultMessageListenerContainer 传入连接,发送方式,监听器,当创建这个对象的时候就不停的监听了。
在实现 messageListener 中编写我们 接收到信息以后 要做的事情。

注意:如果将通知放到service 层那么就会遇到 ,事务还没提交就求发送消息了,这样数据库中数据还没有更新。

所以可以在接收消息的时候,延迟一下在取查询数据库。
可以放在表现层的 controller 中。