消息队列篇—详谈ActiveMQ消息队列模式的分析及使用

消息队列(Message Queue)是分布式系统中重要的组件,通用使用场景能够简单地描述为当不须要当即得到结果,可是并发量需控制时就须要使用消息队列。消息列队有两种消息模式,一种是点对点的消息模式,另外一种是订阅\发布的消息模式。java

点对点的消息模式apache

点对点的模式主要创建在一个队列上,当链接一个列队时,发送方不须要知道接收方是否正在接收消息,能够直接向ActiveMQ发送消息,而发送的消息将直接进入队列中,若是接收方启动着监听,则会向接收方发送消息,若接收方没有接收到消息,则会保存在ActiveMQ服务器中,直到接收方接收消息为止。点对点的消息模式能够有多个接收方和发送方,可是一条消息只会被一个接收方接收到,先连上ActiveMQ接收方,则会先接收到消息,而以后的接收方则接收不到已被接收过的消息。服务器

Java实现ActiveMQ点对点模式,使用ActiveMQ服务器版本为5.15.3,项目使用Maven构建,其中pom.xml增长ActiveMQ依赖jar配置以下:session

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.7.0</version>
</dependency>

点对点的发送方逻辑代码并发

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MQSender {
    
    private String userName = "root";
    private String password = "123456";
    private String url = "tcp://127.0.0.1:61616";
    
    public static void main(String[] args) {
        MQSender send = new MQSender();
        send.start();
    }
    
    public void start(){
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);
            Connection connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//链接名是"textMsg"的队列,此会话将会到该队列中,若 该队列不存在,则被建立
            Destination destination = session.createQueue("textMsg");
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            TextMessage textMsg = session.createTextMessage("消息内容");
            for(int i = 0 ; i < 10; i ++){
                producer.send(textMsg);
            }
            producer.close();
            
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

点对点的接收方代码tcp

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class PTPReceive {
    private String userName = "root";
    private String password = "123456";
    private String url = "tcp://127.0.0.1:61616";
    public static void main(String[] args) {
        PTPReceive receive = new PTPReceive();
        receive.start();
    }
    
    public void start(){
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);
            Connection connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("textMsg");
            MessageConsumer consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        String text = ((TextMessage)message).getText();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            consumer.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

订阅/发布的消息模式分布式

订阅/发布模式有多个接收方和发送方,可是接收方与发送方存在时间上的依赖,若是发送方发送消息时接收方没有监听消息,那么ActiveMQ将不会保存该消息,认为消息已经发送。这个模式还有一个特色就是发送方发送的消息会被全部的接收方接收到,与点对点模式偏偏相反。ide

Java实现ActiveMQ订阅/发布模式,使用ActiveMQ服务器版本为5.15.3,项目使用Maven构建,其中pom.xml增长ActiveMQ依赖jar配置以下:url

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.7.0</version>
</dependency>

订阅/发布的发送方代码code

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MQSender {
    
    private String userName = "root";
    private String password = "123456";
    private String url = "tcp://127.0.0.1:61616";
    
    public static void main(String[] args) {
        MQSender send = new MQSender();
        send.start();
    }
    
    public void start(){
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);
            Connection connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//链接名是"textMsg"的队列,此会话将会到该队列中,若 该队列不存在,则被建立
            Destination destination = session.createTopic("textMsg");
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            TextMessage textMsg = session.createTextMessage("消息内容");
            for(int i = 0 ; i < 10; i ++){
                producer.send(textMsg);
            }
            producer.close();
            
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

订阅/发布的接收方代码

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class PTPReceive {
    private String userName = "root";
    private String password = "123456";
    private String url = "tcp://127.0.0.1:61616";
    public static void main(String[] args) {
        PTPReceive receive = new PTPReceive();
        receive.start();
    }
    
    public void start(){
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);
            Connection connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createTopic("textMsg");
            MessageConsumer consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        String text = ((TextMessage)message).getText();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            consumer.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}