学习ActiveMQ(三):发布/订阅模式(topic)演示

1.在这个项目中新增两个java类,主题生产者和主题消费者:

  2.和点对点的代码差别并不大,所以将消费者和生产者的分别代码拷入新增的java类中,再修改就好了。

appProducerTopic代码:黑色字体是做出了修改,由创建队列改为了创建主题。

复制代码

package com.liu.jms;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class appProducerTopic {

    private static final String url = "tcp://127.0.0.1:61616";//actvemq的服务器tcp连接方式
    private static final String topicName = "topic-test";//定义主题的名称

    public static void main(String[] args) throws  JMSException {
        //1.创建connectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        //2.创建connection
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.创建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建destination
        Destination destination = session.createTopic(topicName);
        //6.创建生产者
        MessageProducer producer = session.createProducer(destination);

        for (int i = 0; i < 100; i++) {

            TextMessage textMessage = session.createTextMessage("test" + i);
            //7.发送消息
            producer.send(textMessage);

            System.out.println("发送消息" + textMessage.getText());

        }
        //8.关闭连接
        connection.close();
    }
}

复制代码

appConsumerTopic代码:标红字体是做出了修改,由创建队列改为了创建主题。

复制代码

package com.liu.jms;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class appConsumerTopic {

    private static final String url = "tcp://127.0.0.1:61616";
    private static final String topicName = "topic-test";//定义主题的名称

    public static void main(String[] args) throws  JMSException {
        //1.创建connectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        //2.创建connection
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.创建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建destination
        Destination destination = session.createTopic(topicName);
        //6.创建消费者
        MessageConsumer consumer = session.createConsumer(destination);
        //7.创建一个监听器
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {

                TextMessage textMessage = (TextMessage)message;
                try {
                    System.out.println("接收到的消息:" + textMessage.getText());

                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //8.关闭连接(监听器是异步的还没有监听到消息的时候,就关闭连接了)
        //connection.close();
    }
}

复制代码

  3.测试

  首先启动消费者这个java类,观察控制台,如下图:

 

  接着启动生产者的java类,观察控制台,如下图:生产了一百条消息。

 

  此时切换至消费的控制台,观察控制台,如下图:已经打印出了一百条消息了,说明消费者已经接受到全部一百条消息。

   6.打开activemq的控制台查看topics:(http://127.0.0.1:8161/admin/topics.jsp)如下图所示:有一个名字是我们设置的queue-test的主题,消费者也有一个就是我们创建的那个消费者类,主题中有一百条消息,被移除了一百条,也就是上面所说的,消费者接收到了这100条全部的消息。

  7.那么如果我启动了两个订阅相同的消费者呢?为了结果能清晰,重启activemq服务,关掉之前的Java类启动,然后启动两边消费者,再启动一个生产者。如下图:生产者生产了100条消息。

 

  8.分别看看两个消费者的接收消息,如下两张图:两个消费者都接受到了一模一样的100条消息。

  9.总结:主题订阅发布模式,有多个消费的订阅相同时,消费者不会相互相互影响,都会分别接收到生产者的全部消息。