消息队列的使用<二>:ActiveMQ的基本使用(Java)

[toc]html

首发时间:2019-05-16java


ActiveMQ

<br>linux

介绍

  • ActiveMQ是Apache旗下的一款开源的消息队列(消息中间件MOM,Message Oriented Middleware)
  • 它彻底支持JMS,支持JMS1.1和J2EE 1.4 规范。
  • 支持多种网络协议。
  • 兼容多种语言(C,C++,Java,Python,PHP)
  • 能够方便地与spring进行整合开发。
  • (其余的一些说的太多可能也不是很懂,之后想了解再本身了解吧,毕竟这里只是个小博文。)

<br> <br>spring

下载、安装和初次运行

1.下载:从ActiveMQ官网下载ActiveMQ,地址:http://activemq.apache.org/download.html 2.安装:下载下来的是一个压缩包,解压即安装,直接解压到一个目录便可; 3.初次运行:(在启动ActiveMQ前,请先要已经安装和配置好JDK)在windows版本的activemq中在activeMQ/bin下面有两个目录,为win32,win64,根据本身的系统位数进入不一样的目录,而后直接双击目录下的activemq.bat (在linux中为: ./activemq start) 4.检测是否启动:ActiveMQ默认使用61616端口提供JMS服务,使用8161端口提供管理控制台服务,咱们能够直接访问activemq的管理控制台网页来肯定是否已经开始服务:localhost:8161/admin,默认的用户名和密码都是admin,输入后将进入以下的界面: 5.关闭ActiveMQ:windosw中直接ctrl+c关闭cmd窗口(在linux中: ./activemq stop)apache

<br>编程

Java上初次使用activeMQ

这里以PTP模型的生产者和消费者的消息传递为例。windows

<br>服务器

1.首先导入依赖包,以maven为例:网络

<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.9</version>
        </dependency>

2.生产者发送消息(以PTP方式为例):【这里是符合上面的“JMS应用开发基本步骤”的】session

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Producer {

    public static void main(String[] args) throws JMSException {
        //1.建立connectionfacoty,参数是activemq的服务地址,前缀tcp表明是tcp链接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        //2.使用ConnectionFactory建立connnect,并启动connnect
        Connection connection = connectionFactory.createConnection();
        connection.start();
        //3.使用Connection建立session,第一个参数是是否使用事务,第二个参数是确认机制
        Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
        //4.建立目的地(这里以PTP为例,因此目的地是一个Queue),参数是Queue的名字
        Destination destination = session.createQueue("tempqueue");
        //5.建立生产者,第一个参数是目的地,此时建立的生产者要与目的地进行绑定。
        MessageProducer producer = session.createProducer(destination);
        //6.使用session建立消息,这里使用TEXT类型的消息
        TextMessage textMessage = session.createTextMessage("hello world!");
        //7.生产者发送消息
        producer.send(textMessage);
        //8.提交事务
        session.commit();
        //9.关闭资源
        session.close();
        connection.close();
    }
}

3.消费者接收消息(以PTP方式为例):

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Consumer {
    public static void main(String[] args) throws JMSException {
        //1.建立connectionfacoty
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        //2.建立connnect,并启动connnect
        Connection connection = connectionFactory.createConnection();
        connection.start();
        //3.建立session,第一个参数是是否使用事务,第二个参数是确认机制
        Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
        //4.建立目的地【消费者与生产者的目的地相同才能进行消息传递】
        Destination destination = session.createQueue("tempqueue");
        //5.建立消费者,第一个参数是目的地,此时建立的消费者要与目的地进行绑定。
        MessageConsumer consumer = session.createConsumer(destination);
        //6.使用消费者接受消息消息
        TextMessage message = (TextMessage) consumer.receive();
        System.out.println(message.getText());
        //8.提交事务
        session.commit();
        //9.关闭资源
        session.close();
        connection.close();
    }
}

上述代码解析: 1.前半部分代码都是同样的,都是建立ConnectionFactory,Connection,Session 2.而后建立目的地Destination,这个目的地就是要把消息存储到哪里和从哪里取消息。 3.若是是生产者,那么由Session来建立生产者,建立的时候传入一个目的地,来与生产者绑定,生产者调用send发送的消息都会存储到目的地中。生产者发送的消息须要使用Session来建立,调用createXXXMessage来建立消息,建立什么类型的消息取决于使用什么方法来建立。 4.若是是消费者,那么由Session来建立消费者,建立的时候传入一个消费者,来与消费者绑定,消费者调用receive时会从目的地中获取消息。获取到的结果是一个XXXMessage,一般须要转成对应类型的Message,而后再调用对应的获取消息体的方法来获取消息体。例如TextMessage类型的消息要获取消息体须要调用getText()。 5.若是使用了事务,那么须要session.commit() 6.最后关闭全部资源

<br> <br>

设置请求属性:

设置标准属性:使用消息调用setJMS开头的方法。【要注意的是为了不发生过时的消息,任何直接经过编程方式来调用setJMSExpiration()方法都会被忽略。 】 设置自定义属性:使用消息调用setXXXProperty的方法。 接受属性:

<br> <br>

可靠性机制

在上面的概念学习中你应该了解到,若是不使用事务来进行消息肯定,那么须要手动使用消息来调用acknowledge来确认消息。【并且这时候是在会话层进行确认,因此在这个会话中只要一条消息进行了确认,其余消息也会被确认(即便他收了两条消息只确认了一条)】 当使用了事务的时候,代码中就不要使用acknowledge了,会影响消息的确认。

<br> <br>

事务

在上面的概念学习有提到了事务,事务可使一系列操做共同成功或失败。下面来演示一下事务的使用。 1.首先,在建立Session的时候第一个参数是是否使用事务,要使用事务须要赋值TRUE。 2.提交事务使用session.commit(),回滚是session.rollback() 3.对于生产者,事务是确保消息发送的一致性;对于消费者,事务是确保消息消费的一致性, 4.对于事务的测试可使用单步运行来测试,在发消息处打断点,测试未commit时消费者是否能取到消息。

<br> <br>

消息消费方式

在上面的概念学习中有谈到消费者消费消息的两种方式,一种是堵塞的receive,一种是监听器。监听器与receive最大的区别是若是没有,那么监听器不会等,而receive会等。因此监听器适用于不但愿堵塞程度运行的场景。

receive

上面的代码都是使用堵塞的receive来接收的,你应该能够留意到当运行了消费者后,没有取到消息的时候会一直堵塞在那里。receive也能够设置阻塞时间,时间到了就再也不等了。

监听器:

public class Consumer {
    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("tempqueue2");
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage msg = (TextMessage) message;
                try {
                    System.out.println("..."+msg.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        session.commit();
        session.close();
        connection.close();
    }
}

<br> <br>

消息类型

在上面有介绍到消息体有不一样的类型,TextMessage,MapMessage等。 不一样的消息相似影响存储到消息队列的消息的格式,也影响取消息的方式,取消息的方式要与消息类型对应。

TextMessage:数据类型相似于字符串。使用getText()来获取数据。 <br>

MapMessage:数据类型相似于Map。使用getXXX(key)来获取数据。 <br>

<br> <br>

发布/订阅模式

上面主要讲的都是PTP模式,下面来说一下发布/订阅模式。在上面的概念学习中,有涉及到多消费者广播、持久化订阅,下面将演示这些概念的实际使用。

非持久订阅

先演示非持久订阅的,因为非持久订阅只能发送给在线的消费者,因此先运行消费者(多个)。【非持久订阅的消息接收与PTP同样可使用receive】 而后建立生产者发生消息: 注意:在非持久化订阅中,一般要使消费者持续receive,因此一般使用while循环来接受消息。

Message message = consumer.receive();
while(message!=null){
   TextMessage txtMsg = (TextMessage)message;
   sysout(textmsg.getText());
   message = consumer.receive();
}

持久化订阅

要进行持久化订阅,首先要将生产者的发送模式改为持久化模式,这个设置要在connection.start()以前 而后消费者要建立持久订阅器,并且要在消息发送以前先运行一次把持久化订阅器注册到消息队列上。 【注意:须要在链接上设置消费者ID,用来识别消费者,持久化订阅器识别消费者依靠消费者ID,若是不设置,那么下一次“上线”的时候,因为消费者ID会变化,致使订阅器没法与消费者进行关联】

public class Consumer {
    public static void main(String[] args) throws JMSException {
        //1.建立connectionfacoty
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        //2.建立connnect,并启动connnect
        Connection connection = connectionFactory.createConnection();
        connection.setClientID("001");

        //3.建立session,第一个参数是是否使用事务,第二个参数是确认机制
        Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createTopic("tempTopic");
        TopicSubscriber s1 = session.createDurableSubscriber((Topic) destination, "s1");
        connection.start();
        Message msg = s1.receive();
        while (msg!=null){
            TextMessage txtmsg = (TextMessage) msg;
            System.out.println(txtmsg.getText());
            session.commit(); // 因为此时使用了while,因此要在里面commit
            msg= s1.receive();
        }
        session.close();
        connection.close();
    }
}

<br> <br>

Broker

Broker是ActiveMQ服务器实例。 在使用独立的ActiveMQ程序的时候,有时候会建立不一样需求的服务器实例,一般来讲都是使用某个配置文件进行建立。 而也是能够经过代码内建一个Broker的,内建的Broker比较小巧,适用于一些但愿把Broker整合到项目中的场景。

1.经过BrokerService建立:

上述代码报错java.lang.NoClassDefFoundError:com/fasterxml/jackson/databind/ObjectMapper时可能缺少如下依赖包:

<dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.5</version>
        </dependency>

2.经过BrokerFactory建立:

使用BrokerFactory建立时须要一个配置文件,这个配置文件的路径要被传入到BrokerFactory中,在上面的代码中就是properties:broker.properties,配置文件里面是这个Broker的参数。 下面是broker.properties的一个例子:

useJms=true
persistent=false
brokerName=FactoryBroker

对于能配置什么,能够参考使用BrokerService建立Broker时的setXXX。

<br> <br>

整合spring开发

如今不少的项目都使用到了Spring,因此这里也讲一下与Spring的整合。 首先理一下,ActiveMQ有什么能够交给Spring来管理的?能够说能够交给Spring管理的只有Destination,ConnectionFactory,Connection和Broker,只有这几个实例的复用性比较强,这几个的管理会在JmsTemplate使用中展现。 <br>

若是你学过Hibernate之类的框架,你应该知道Spring对Hibernate提供HibernateTemplate来整合,HibernateTemplate封装了Hibernate的一些方法,简化了使用,在使用HibernateTemplate的时候,dao层的类须要继承HibernateDaoSupport,而后同时须要在类中注入Connection,这样HibernateTemplate才能够正常工做。JmsTemplate相似于HibernateTemplate,不过它是面向JMS的。

使用JmsTemplate

1.首先要编写Spring配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    <!--管理destination-->
    <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg name="name" value="spring-queue"/> <!-- 目的地名称 -->
    </bean>
    <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg name="name" value="spring-topic"/> <!-- 目的地名称 -->
    </bean>
    
    <!--管理connectionFactory-->
    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL">
                    <value>tcp://localhost:61616</value>
                </property>
            </bean>
        </property>
        <property name="maxConnections" value="100"></property>
    </bean>

    <!--管理JmsTemplate-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="jmsFactory"></property>
        <property name="defaultDestination" ref="destinationQueue"></property>
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"></bean>
        </property>
    </bean>

</beans>

2.而后建立测试类:

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import javax.annotation.Resource;
import javax.jms.*;

@RunWith(SpringJUnit4ClassRunner.class) //使用junit4进行测试
@ContextConfiguration(locations={"classpath:applicationContext.xml"}) //加载配置文件
public class JmsTemplateDemo {
    @Autowired
    private JmsTemplate jmsTemplate;
    @Resource(name="destinationQueue")
    private Destination destinationQueue;
    //发送
    @Test
    public  void send() throws JMSException {
        String msg = "hello world!";
        jmsTemplate.send(destinationQueue, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(msg);
            }
        });

    }
    //接收
    @Test
    public  void recv() throws JMSException {
        TextMessage message = (TextMessage) jmsTemplate.receive(destinationQueue);
        System.out.println(message.getText());
    }
}

在上面的代码中可使用jmsTemplate.send来发送消息,使用jmsTemplate.receive来接收消息。对于消息确认和事务管理则不须要关心,JmsTemplate会本身处理的。

监听器

在上面的JmsTemplate接收消息中使用了receive来接收消息,Spring还支持使用监听器来接收消息,配置监听器,来达到一有消息就执行某些操做,这样就省去了消费者的代码。 1.首先,须要须要在spring配置文件中配置DefaultMessageListenerContainer【还有其余几种监听器】:

<!--配置DefaultMessageListenerContainer -->
    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="jmsFactory"></property>
        <!--监听器 -->
        <property name="messageListener" ref="myMessageListener"></property>
        <!--监听哪一个目的地 -->
        <property name="destination" ref="destinationQueue"></property>
     </bean>

    <!--配置监听器-->
    <bean id="myMessageListener" class="withspring.MyMessageListener">
    </bean>

2.而后,定义一个监听器:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MyMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                TextMessage txtMsg = (TextMessage) message;
                String msg = txtMsg.getText();
                System.out.println("recv:"+ msg);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        } else {
            throw new IllegalArgumentException("消息类型错误!");
        }
    }
}

3.最后,运行随便在这个项目中的一个发送消息的测试方法。

<br> <br>

使用spring集合Broker

1.经过BrokerService:

<?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
           
        <bean id="broker" class="org.apache.activemq.broker.BrokerService" init-method="start" destroy-method="stop">
            <property name="brokerName" value="SpringBroker"/>
            <property name="persistent" value="false"/>
            <property name="transportConnectorURIs">
                <list>
                    <value>tcp://localhost:61616</value>
                </list>
            </property>
        </bean>
    </beans>

【还能够经过BrokerFactory或BrokerFactoryBean来建立,这里省略】 【固然,上面的是比较简单的,没有进行权限管理,你也登陆不了管理页,想要肯定是否建立成功能够监听接口也能够进行生产和消费消息】

<br> <br>


后续可扩展内容

<br> 这里只是一篇小博客,写不了太多东西。若是想要了解更精细,能够去买书来看。 下面写一下后续可扩展学习的内容,学不学由我的考虑。 <br> * 传输协议【上面介绍了tcp://localhost:61616,其实还能够容许非TCP的链接】 * 消息存储持久化【消息是怎么进行存储的】 * KahaDB * AMQ * JDBC * 内存存储 * 部署与集群 * 优化

<br>