RabbitMq 操作总结

RabbitMq java操作总结

  1. 消息队列解决了什么问题?
  1. 异步处理
  2. 应用解耦
  3. 流量削锋
  4. 日志处理

 

  1. JAVA操作rabbitmq
  1. Simple 简单队列
  2. work queues工作队列 公平分发 轮询分发
  3. publish/subscribe发布订阅
  4. Routing 路由选择 通配符模式
  5. Topics 主题
  6. 手动和自动确认消息
  7. 队列的持久化和非持久化

 

RabbitMq安装步骤:

https://blog.csdn.net/qq_41950069/article/details/81346675

http://www.noobyard.com/article/p-kpczytby-go.html

  1. 先安装erlong
  2. 再安装rabbitmq

 

Rabbit mq 教程:

https://blog.csdn.net/hellozpc/article/details/81436980

 

安装好后rabbitmq后台登录地址:http://localhost:15672/

账号密码:都是guest

 

Spring boot整合RabbitMq:

http://www.noobyard.com/article/p-zbdqsmbb-et.html

http://www.noobyard.com/article/p-tjutigmg-hy.html

 

SpringBoot整合RabbitMQ之典型应用场景实战一:

http://www.noobyard.com/article/p-rrnndfmc-dp.html

 

springboot+rabbitMq整合开发实战二:模拟用户下单的过程:

http://www.noobyard.com/article/p-vnouhizv-b.html

1、添加用户

用户界面

 

Virtual hosts 管理


Virtual hosts 相当于mysql 的 db ,添加的路径一般以“/”开头

添加路径后在上面的表格中就会显示你添加的路径,点击添加的路径就会进入到下面的界面

给用户进行授权,选择添加的用户进行授权,这样用户才能访问这个地址

 

2、java操作队列

 

2.1简单队列

2.1.1 模型

 

p:消息的生产者

红色区域:队列(Queue)

C:消费者

3个对象: 生产者 队列 消费者

RabbitMQ中的消息都只能存储在Queue中,生产者(图中的P)生产消息并最终投递到Queue中,消费者(图中的C)可以从Queue中获取消息并消费。 

2.1.2 获取MQ连接

Java代码演示:

1、引入maven依赖

  1. <dependency>  
  2.      <groupId>com.rabbitmq</groupId>  
  3.      <artifactId>amqp-client</artifactId>  
  4.      <version>5.7.3</version>  
  5. </dependency>

2、编写连接工具类

  1. public class RabbitConnectionUtil {  
  2.     //在rabbitmq页面自己创建的VirtualHost地址  
  3.     public final static String vHost = "/test_rabbit";  
  4.   
  5.     /** 
  6.      * 创建连接方法 
  7.      * @return 连接 
  8.      */  
  9.     public static Connection getConnection() throws Exception {  
  10.         //创建连接工厂  
  11.         ConnectionFactory connectionFactory = new ConnectionFactory();  
  12.         //设置服务器地址  
  13.         connectionFactory.setHost("127.0.0.1");  
  14.         //设置AHQP 连接端口  
  15.         connectionFactory.setPort(5672);  
  16.         //设置VirtualHost 地址  
  17.         connectionFactory.setVirtualHost(vHost);  
  18.         //设置用户名  
  19.         connectionFactory.setUsername("guest");  
  20.         //设置密码  
  21.         connectionFactory.setPassword("guest");  
  22.         return connectionFactory.newConnection();  
  23.     }  
  24. }  

2.1.3 生产者生产消息

1、首先需要定义队列的名称,这里我将队列名称定义提取出来作为常量,生产者和消费者都是在这个队列中发送和接收消息

  1. public class RabbitConstant {  
  2.     public final static String QUEUE_NAME = "test_simple_queue";  

 

2、发送队列消息

  1. public static void main(String[] args) throws Exception {  
  2.        //创建连接  
  3.        Connection connection = RabbitConnectionUtil.getConnection();  
  4.        //创建通道  
  5.        Channel channel = connection.createChannel();  
  6.        //创建队列声明  
  7.        channel.queueDeclare(RabbitConstant.QUEUE_NAME,false,false,false,null);  
  8.        //向队列中推送消息  
  9.        String msg = "hello simple !";  
  10.        channel.basicPublish("",RabbitConstant.QUEUE_NAME,null,msg.getBytes());  
  11.        System.out.println("send simple success:"+msg);  
  12.        //关闭通道和连接  
  13.        channel.close();  
  14.        connection.close();  
  15. }

2.1.4 消费者接收消息

1、老版本支持的方法,亲测3.x之类的版本支持这种方式,在5.版本已经不支持这种方式

  1. public static void main(String[] args) throws Exception {  
  2.        //创建连接  
  3.        Connection connection = RabbitConnectionUtil.getConnection();  
  4.        //创建通道  
  5.        Channel channel = connection.createChannel();  
  6.        //创建队列消费者  
  7.        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);  
  8.        //监听队列  
  9.        channel.basicConsume(RabbitConstant.QUEUE_NAME,true,queueingConsumer);  
  10.        while (true){  
  11.            Delivery delivery = queueingConsumer.nextDelivery();  
  12.            String msg = new String(delivery.getBody());  
  13.            System.out.println("Recv success:"+msg);  
  14.        }  
  15. }  

 

2、新版本支持方式

  1. public static void main(String[] args) throws Exception {  
  2.      //创建连接  
  3.      Connection connection = RabbitConnectionUtil.getConnection();  
  4.      //创建通道  
  5.      Channel channel = connection.createChannel();  
  6.      //创建队列声明  
  7.      channel.queueDeclare(RabbitConstant.QUEUE_NAME,false,false,false,null);  
  8.      //重写handleDelivery 方法,实现阻塞之后获取消息的处理  
  9.      Consumer consumer = new DefaultConsumer(channel) {  
  10.          //获取到达的消息  
  11.          @Override  
  12.          public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  13.              super.handleDelivery(consumerTag, envelope, properties, body);  
  14.              //打印接收到的内容  
  15.              String msg = new String(body,"utf-8");  
  16.              System.out.println("Recv success:"+msg);  
  17.          }  
  18.      };  
  19.      //监听队列  
  20.      channel.basicConsume(RabbitConstant.QUEUE_NAME,true,consumer);  

2.1.5 简单队列的不足

耦合性高,生产者--对应消费者(如果我想有多个消费者去消费队列中消息,这时候就不行了)队列名称变更 这时候得同时变更

 

2.2 工作队列(轮询分发 round-robin

2.2.1 模型

 

多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

 

为什么会出现工作队列?

Simple队列是一一对应的,而且我们实际开发,生产者发送消息是毫不费力的,而消费者一般是要跟业务相结合的,消费者接收到消息之后就需要处理,处理需要花费时间,这时候队列就会积压了很多消息。

 

2.2.2 生产者

1、首先需要定义队列的名称,这里我将队列名称定义提取出来作为常量,生产者和消费者都是在这个队列中发送和接收消息

  1. public class RabbitConstant {  
  2.     public final static String QUEUE_NAME_SIMPLE = "test_simple_queue";  
  3.     public final static String QUEUE_NAME_WORK = "test_work_queue";  
  4. }  

2、创建生产者

  1. public static void main(String[] args) throws Exception {  
  2.       //创建连接  
  3.       Connection connection = RabbitConnectionUtil.getConnection();  
  4.       //创建通道  
  5.       Channel channel = connection.createChannel();  
  6.       //创建队列声明  
  7.       channel.queueDeclare(RabbitConstant.QUEUE_NAME_WORK,false,false,false,null);  
  8.       //向队列中推送消息  
  9.       for(int i = 0; i<50;i++){  
  10.           String msg = "hello work["+i+"] !";  
  11.           channel.basicPublish("",RabbitConstant.QUEUE_NAME_WORK,null,msg.getBytes());  
  12.           System.out.println("send work success:"+msg);  
  13.           Thread.sleep(20);  
  14.       }  
  15.       //关闭通道和连接  
  16.       channel.close();  
  17.       connection.close();  
  18. }  

2.2.3 消费者

消费者1:

  1. public class Recv1 {  
  2.     public static void main(String[] args) throws Exception {  
  3.         //创建连接  
  4.         Connection connection = RabbitConnectionUtil.getConnection();  
  5.         //创建通道  
  6.         Channel channel = connection.createChannel();  
  7.         //创建队列声明  
  8.         channel.queueDeclare(RabbitConstant.QUEUE_NAME_SIMPLE,false,false,false,null);  
  9.         //重写handleDelivery 方法,实现阻塞之后获取消息的处理  
  10.         Consumer consumer = new DefaultConsumer(channel) {  
  11.             //获取到达的消息  
  12.             @Override  
  13.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  14.                 super.handleDelivery(consumerTag, envelope, properties, body);  
  15.                 //打印接收到的内容  
  16.                 String msg = new String(body,"utf-8");  
  17.                 System.out.println("【1】Recv success:"+msg);  
  18.                 try {  
  19.                     Thread.sleep(600);  
  20.                 } catch (InterruptedException e) {  
  21.                     e.printStackTrace();  
  22.                 }finally {  
  23.                     System.out.println("【1】 done !");  
  24.                 }  
  25.             }  
  26.         };  
  27.         //监听队列  
  28.         Boolean autoAck = true;  
  29.         channel.basicConsume(RabbitConstant.QUEUE_NAME_SIMPLE,autoAck,consumer);  
  30.     }  

消费者2:

  1. public class Recv2 {  
  2.     public static void main(String[] args) throws Exception {  
  3.         //创建连接  
  4.         Connection connection = RabbitConnectionUtil.getConnection();  
  5.         //创建通道  
  6.         Channel channel = connection.createChannel();  
  7.         //创建队列声明  
  8.         channel.queueDeclare(RabbitConstant.QUEUE_NAME_SIMPLE,false,false,false,null);  
  9.         //重写handleDelivery 方法,实现阻塞之后获取消息的处理  
  10.         Consumer consumer = new DefaultConsumer(channel) {  
  11.             //获取到达的消息  
  12.             @Override  
  13.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  14.                 super.handleDelivery(consumerTag, envelope, properties, body);  
  15.                 //打印接收到的内容  
  16.                 String msg = new String(body,"utf-8");  
  17.                 System.out.println("【2】Recv success:"+msg);  
  18.                 try {  
  19.                     Thread.sleep(200);  
  20.                 } catch (InterruptedException e) {  
  21.                     e.printStackTrace();  
  22.                 }finally {  
  23.                     System.out.println("【2】 done !");  
  24.                 }  
  25.             }  
  26.         };  
  27.         //监听队列  
  28.         Boolean autoAck = true;  
  29.         channel.basicConsume(RabbitConstant.QUEUE_NAME_SIMPLE,autoAck,consumer);  
  30.     }  
  31. }  

2.2.4 现象

消费者1休眠时间在600ms和消费者2休眠时间在200ms的情况下,消费者1和消费者2处理的消息是一样多的
消费者1:偶数
消費者2:奇数
这种方式叫做轮询分发(round-robin)结果就是不管谁忙活谁清闲,都不会多给一个消息,任务总是你一个我一个

 

2.3 工作队列(公平分发fail dipatch)

2.3.1 生产者

  1. public class send {  
  2.     public static void main(String[] args) throws Exception {  
  3.         //创建连接  
  4.         Connection connection = RabbitConnectionUtil.getConnection();  
  5.         //创建通道  
  6.         Channel channel = connection.createChannel();  
  7.         //创建队列声明  
  8.         channel.queueDeclare(RabbitConstant.QUEUE_NAME_WORK,false,false,false,null);  
  9.         //在每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,1次只处理1个消息  
  10.         //限制发送给同一个消费者 不得超过一次  
  11.         int prefethCount  = 1;  
  12.         channel.basicQos(prefethCount);  
  13.         //向队列中推送消息  
  14.         for(int i = 0; i<50;i++){  
  15.             String msg = "hello work["+i+"] !";  
  16.             channel.basicPublish("",RabbitConstant.QUEUE_NAME_WORK,null,msg.getBytes());  
  17.             System.out.println("send work success:"+msg);  
  18.             Thread.sleep(20);  
  19.         }  
  20.         //关闭通道和连接  
  21.         channel.close();  
  22.         connection.close();  
  23.     }  

2.3.2 消费者

消费者1:

  1. public class Recv1 {  
  2.     public static void main(String[] args) throws Exception {  
  3.         //创建连接  
  4.         Connection connection = RabbitConnectionUtil.getConnection();  
  5.         //创建通道  
  6.         final Channel channel = connection.createChannel();  
  7.         //创建队列声明  
  8.         channel.queueDeclare(RabbitConstant.QUEUE_NAME_SIMPLE,false,false,false,null);  
  9.         //保证一次只分发一个  
  10.         channel.basicQos(1);  
  11.         //重写handleDelivery 方法,实现阻塞之后获取消息的处理  
  12.         Consumer consumer = new DefaultConsumer(channel) {  
  13.             //获取到达的消息  
  14.             @Override  
  15.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  16.                 super.handleDelivery(consumerTag, envelope, properties, body);  
  17.                 //打印接收到的内容  
  18.                 String msg = new String(body,"utf-8");  
  19.                 System.out.println("【1】Recv success:"+msg);  
  20.                 try {  
  21.                     Thread.sleep(600);  
  22.                 } catch (InterruptedException e) {  
  23.                     e.printStackTrace();  
  24.                 }finally {  
  25.                     System.out.println("【1】 done !");  
  26.                     //手动回执消息  
  27.                     channel.basicAck(envelope.getDeliveryTag(),false);  
  28.                 }  
  29.             }  
  30.         };  
  31.         //监听队列  
  32.         Boolean autoAck = false;//自动应答改为false  
  33.         channel.basicConsume(RabbitConstant.QUEUE_NAME_SIMPLE,autoAck,consumer);  
  34.     }  
  35. }  

 

消费者2:

  1. public class Recv2 {  
  2.     public static void main(String[] args) throws Exception {  
  3.         //创建连接  
  4.         Connection connection = RabbitConnectionUtil.getConnection();  
  5.         //创建通道  
  6.         final Channel channel = connection.createChannel();  
  7.         //创建队列声明  
  8.         channel.queueDeclare(RabbitConstant.QUEUE_NAME_SIMPLE,false,false,false,null);  
  9.         //保证一次只分发一个  
  10.         channel.basicQos(1);  
  11.         //重写handleDelivery 方法,实现阻塞之后获取消息的处理  
  12.         Consumer consumer = new DefaultConsumer(channel) {  
  13.             //获取到达的消息  
  14.             @Override  
  15.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  16.                 super.handleDelivery(consumerTag, envelope, properties, body);  
  17.                 //打印接收到的内容  
  18.                 String msg = new String(body,"utf-8");  
  19.                 System.out.println("【2】Recv success:"+msg);  
  20.                 try {  
  21.                     Thread.sleep(200);  
  22.                 } catch (InterruptedException e) {  
  23.                     e.printStackTrace();  
  24.                 }finally {  
  25.                     System.out.println("【2】 done !");  
  26.                     //手动回执消息  
  27.                     channel.basicAck(envelope.getDeliveryTag(),false);  
  28.                 }  
  29.             }  
  30.         };  
  31.         //监听队列  
  32.         Boolean autoAck = false;//自动应答改为false  
  33.        channel.basicConsume(RabbitConstant.QUEUE_NAME_SIMPLE,autoAck,consumer);  
  34.     }  

2.3.3 现象

消费者2处理的消息比消费者1多,能者多劳。因为在消费者1中设置的是每次处理完一个消息休眠600ms,而消费者2是休眠200毫秒,所以消费者2比消费者1多。

 

2.4 消息应答与消息持久化

2.4.1 消息应答

  1. //监听队列  
  2. Boolean autoAck = false;//自动应答改为false  
  3. channel.basicConsume(RabbitConstant.QUEUE_NAME_SIMPLE,autoAck,consumer);

 

Boolean autoAck = true;(自动确认模式),一旦rabbitmq将消息分发给消费者,就会从内存中删除

这种情况下,如果杀死正在执行的消费者,就会丢失正在处理的消息

 

Boolean autoAck = false;(手动模式),如果有1个消费者挂掉,就会交付给其他消费者,rabbitmq支持消息应答,消费者发送一个消息应答,告诉rabbitmq这个消息我已经处理完成你可以删了,然后rabbitmq就会删除内存中的消息

 

消息应答默认是打开的,false

 

Message acknowledgment:(消息应答)

大家想,如果我们的rabbitmq挂了,我们的消息依然会丢失

2.4.2 消息的持久化

  1. //创建队列声明  
  2. boolean durale = false;  
  3. channel.queueDeclare(RabbitConstant.QUEUE_NAME_WORK,durale,false,false,null);  

durale:表示是否持久化,false表示不进行持久化,true表示持久化  

我们将程序中的boolean durale = false;改成true是不可以的,尽管代码是正确的,它也不会执行成功,因为我们已经定义了一个叫做“test_work_queue”的队列,这个queue (队列) 是未持久化的,rabbitmq不允许重新定义(不同参数)一个已经存在的队列。如果需要重新定义这个队列的参数,唯一的做法就是进入rabbitmq将这个队列删除,再重新启动java程序,这时就可以重新创建一个修改后参数的队列。

 

2.4.3 queueDeclare方法 队列声明

  1. Queue.DeclareOk queueDeclare (String queue , boolean durable , boolean exclusive , boolean autoDelete , Map arguments) throws IOException;   

该方法有5各参数:

queue 队列的名称

durable: 设置是否持久化。为 true 则设置队列为持久化。持久化的队列会存盘,在 服务器重启的时候可以保证不丢失相关信息。

exclusive 设置是否排他。为 true 则设置队列为排他的。如果一个队列被声明为排 他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意 三点:排他队列是基于连接( Connection) 可见的,同 个连接的不同信道 (Channel) 是可以同时访问同一连接创建的排他队列; "首次"是指如果 个连接己经声明了 排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同:即使该队 列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列 适用于一个客户端同时发送和读取消息的应用场景。

autoDelete: 设置是否自动删除。为 true 则设置队列为自动删除。自动删除的前提是: 至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会 自动删除。不能把这个参数错误地理解为: "当连接到此队列的所有客户端断开时,这 个队列自动删除",因为生产者客户端创建这个队列,或者没有消费者客户端与这个队 列连接时,都不会自动删除这个队列。

argurnents: 设置队列的其他一些参数,如 x-rnessage-ttl 、x-expires 、x-rnax-length 、x-rnax-length-bytes、 x-dead-letter-exchange、 x-deadletter-routing-key 、 x-rnax-priorit

 

2.5 订阅模式(publish_subscribe)

2.5.1 模型

解读:

1,一个生产者,多个消费者

2,每一个消费者都有自己的队列

3.生产者没有直接把消息发送到队列 而是发到了交换机 转发器exchange

4,每个对列都要绑定到交换机上

5.生产者发送的消息经过交换机到达队列,就能实现一个消息被多个消费者消费

 

注册 --> 邮件 --> 短信

2.5.2 生产者

  1. 首先定义交换机名称
  1. public class RabbitConstant {  
  2.     public final static String QUEUE_NAME_SIMPLE = "test_simple_queue";  
  3.     public final static String QUEUE_NAME_WORK = "test_work_queue";  
  4.     public final static String EXCHANGE_NAME_FANOUT = "test_fanout_exchange";//交换机名称  
  5. }  

   

    2.创建生产者

  1. public class send {  
  2.     public static void main(String[] args) throws Exception {  
  3.         //创建连接  
  4.         Connection connection = RabbitConnectionUtil.getConnection();  
  5.         //创建通道  
  6.         Channel channel = connection.createChannel();  
  7.         //声明交换机  
  8.         channel.exchangeDeclare(RabbitConstant.EXCHANGE_NAME_FANOUT,"fanout");//分发  
  9.         //向交换机中推送消息  
  10.         String msg = "hello publish subscribe !";  
  11.         channel.basicPublish(RabbitConstant.EXCHANGE_NAME_FANOUT,"", null, msg.getBytes());  
  12.         System.out.println("send publish subscribe success:" + msg);  
  13.   
  14.         //关闭通道和连接  
  15.         channel.close();  
  16.         connection.close();  
  17.     }  
  18. }  

通过控制台可以看到,并没有我们发送的消息,消息哪去了?

丢失了,因为交换机没有存储的能力,在rabbitmq里面只有队列有存储能力,因为这时候还没有队列绑定到这个交换机所以数据丢失了。

​​​​​​​2.5.3 消费者

  1. 首先定义绑定到交换机的队列的名称
  1. public class RabbitConstant {  
  2.     public final static String QUEUE_NAME_SIMPLE = "test_simple_queue";  
  3.     public final static String QUEUE_NAME_WORK = "test_work_queue";  
  4.     public final static String EXCHANGE_NAME_FANOUT = "test_fanout_exchange";//交换机名称  
  5.     public final static String QUEUE_NAME_EXCHANGE_BIND_EMIL = "test_queue_fanout_emil";//绑定到交换机的队列  
  6.     public final static String QUEUE_NAME_EXCHANGE_BIND_SMS = "test_queue_fanout_sms";//绑定到交换机的队列  
  7. }  

 

2、创建消费者

消费者1:

  1. public class Recv1 {  
  2.     public static void main(String[] args) throws Exception {  
  3.         //创建连接  
  4.         Connection connection = RabbitConnectionUtil.getConnection();  
  5.         //创建通道  
  6.         final Channel channel = connection.createChannel();  
  7.         //创建队列声明  
  8.         channel.queueDeclare(RabbitConstant.QUEUE_NAME_EXCHANGE_BIND_EMIL,false,false,false,null);  
  9.         //绑定队列到交换机  
  10.         channel.queueBind(RabbitConstant.QUEUE_NAME_EXCHANGE_BIND_EMIL,RabbitConstant.EXCHANGE_NAME_FANOUT,"");  
  11.         //保证一次只分发一个  
  12.         channel.basicQos(1);  
  13.         //重写handleDelivery 方法,实现阻塞之后获取消息的处理  
  14.         Consumer consumer = new DefaultConsumer(channel) {  
  15.             //获取到达的消息  
  16.             @Override  
  17.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  18.                 super.handleDelivery(consumerTag, envelope, properties, body);  
  19.                 //打印接收到的内容  
  20.                 String msg = new String(body,"utf-8");  
  21.                 System.out.println("【1】Recv success:"+msg);  
  22.                 try {  
  23.                     Thread.sleep(10);  
  24.                 } catch (InterruptedException e) {  
  25.                     e.printStackTrace();  
  26.                 }finally {  
  27.                     System.out.println("【1】 done !");  
  28.                     //手动回执消息  
  29.                     channel.basicAck(envelope.getDeliveryTag(),false);  
  30.                 }  
  31.             }  
  32.         };  
  33.         //监听队列  
  34.         Boolean autoAck = false;//自动应答改为false  
  35.         channel.basicConsume(RabbitConstant.QUEUE_NAME_EXCHANGE_BIND_EMIL,autoAck,consumer);  
  36.     }  
  37. }  

 

消费者2:

  1. public class Recv2 {  
  2.     public static void main(String[] args) throws Exception {  
  3.         //创建连接  
  4.         Connection connection = RabbitConnectionUtil.getConnection();  
  5.         //创建通道  
  6.         final Channel channel = connection.createChannel();  
  7.         //创建队列声明  
  8.         channel.queueDeclare(RabbitConstant.QUEUE_NAME_EXCHANGE_BIND_SMS,false,false,false,null);  
  9.         //绑定队列到交换机  
  10.         channel.queueBind(RabbitConstant.QUEUE_NAME_EXCHANGE_BIND_SMS,RabbitConstant.EXCHANGE_NAME_FANOUT,"");  
  11.         //保证一次只分发一个  
  12.         channel.basicQos(1);  
  13.         //重写handleDelivery 方法,实现阻塞之后获取消息的处理  
  14.         Consumer consumer = new DefaultConsumer(channel) {  
  15.             //获取到达的消息  
  16.             @Override  
  17.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  18.                 super.handleDelivery(consumerTag, envelope, properties, body);  
  19.                 //打印接收到的内容  
  20.                 String msg = new String(body,"utf-8");  
  21.                 System.out.println("【2】Recv success:"+msg);  
  22.                 try {  
  23.                     Thread.sleep(200);  
  24.                 } catch (InterruptedException e) {  
  25.                     e.printStackTrace();  
  26.                 }finally {  
  27.                     System.out.println("【2】 done !");  
  28.                     //手动回执消息  
  29.                     channel.basicAck(envelope.getDeliveryTag(),false);  
  30.                 }  
  31.             }  
  32.         };  
  33.         //监听队列  
  34.         Boolean autoAck = false;//自动应答改为false  
  35.         channel.basicConsume(RabbitConstant.QUEUE_NAME_EXCHANGE_BIND_SMS,autoAck,consumer);  
  36.     }  
  37. }  

 

2.5.4 总结

  1. 在控制界面可以看到有两个队列已经绑定到了转换器上
  1. 测试结果:同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。这个怎么说呢,在同一个队列中呢,只能有一个消费者可以获得消息,因为这是点对点,是队列的特性,谁把这个机会抢走了,这个机会就没有了。
  1. 发布订阅类似广播,发送方只管向外发送消息,接收方呢只要去听这个广播就能得到广播的消息,但是如果你刚好不在,那么广播里的内容也就错过了。在rabbitmq中就是绑定到交换机的队列的消费者可以接收到这个消息。

2.6 exchange (交换机 转换器)

2.6.1 说明

作用:一方面是接收生产者的消息,另一方面是向队列推送消息

2.6.1.1 匿名转发

  1. //发送消息  
  2. String exchange = ""
  3. channel.basicPublish(exchange,RabbitConstant.QUEUE_NAME_WORK,null,msg.getBytes());  

 

匿名转发:””  (exchange为空串表示匿名转发)也就是不指定转化器 ,也就是在普通的队列模式中使用

2.6.1.2 不处理路由(Fanout

  1. //声明交换机  
  2. channel.exchangeDeclare(RabbitConstant.EXCHANGE_NAME_FANOUT,"fanout");//分发  

 

Fanout (不处理路由键)

只要绑定到交换机的队列都能获取到消息

  1. //向交换机中推送消息  
  2. String routingKey= "";  
  3. channel.basicPublish(RabbitConstant.EXCHANGE_NAME_FANOUT,routingKey, null, msg.getBytes());  

 

不处理路由:”” (routingKey为空串表示不处理路由)

2.6.1.3 处理路由(Direct

Direct (处理路由键)

通过模型我们可以看到,在发送方推送消息时指定路由键的名称,在接收方的队列必须指定对应的路由键才能获得相应的消息,这就可以让我们选择性的订阅指定消息

发送方:

  1. //声明交换机  
  2. channel.exchangeDeclare(RabbitConstant.EXCHANGE_NAME_FANOUT,"direct");//分发  
  3. //向交换机中推送消息  
  4. String msg = "hello publish subscribe !";  
  5. String routingKey= "text_routing_key";  
  6. channel.basicPublish(RabbitConstant.EXCHANGE_NAME_FANOUT,routingKey, null, msg.getBytes()); 

接收方:

  1. //绑定队列到交换机  
  2. String routingKey= "text_routing_key";  
  3. channel.queueBind(RabbitConstant.QUEUE_NAME_EXCHANGE_BIND_EMIL,RabbitConstant.EXCHANGE_NAME_FANOUT,routingKey);  

2.6.1.4 主题(Topic)

将路由键和某模式进行匹配

#匹配一个或多个单词

*匹配一个单词

比如:goods.*只能匹配到 goods.add , goods.# 则可以匹配到goods.add.chongqing

指定为主题模式时,可以通过#、*通配符来匹配不同的路由键

 

生产者:

  1. /声明交换机  
  2. channel.exchangeDeclare(RabbitConstant.EXCHANGE_NAME_TOPIC,"topic");//分发  
  3. //向交换机中推送消息  
  4. String msg = "hello publish subscribe !";  
  5. String routingKey = "goods.add";//指定路由键  
  6. channel.basicPublish(RabbitConstant.EXCHANGE_NAME_TOPIC,routingKey, null, msg.getBytes());  

接收方:

  1. //绑定队列到交换机  
  2. String routingKey = "goods.#";//指定路由键  
  3. channel.queueBind(RabbitConstant.QUEUE_NAME_TOPIC_TWO,RabbitConstant.EXCHANGE_NAME_TOPIC,routingKey);  

2.6.2 basicPublish 方法 (发送消息)

  1. void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;  

exchange:交换机的名称

routingKey:路由键,#匹配0个或多个单词,*匹配一个单词,在topic exchange做消息转发用

mandatory:true:如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者。false:出现上述情形broker会直接将消息扔掉

immediate:true:如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
BasicProperties :需要注意的是BasicProperties.deliveryMode,0:不持久化 1:持久化 这里指的是消息的持久化,配合channel(durable=true),queue(durable)可以实现,即使服务器宕机,消息仍然保留

body:推送的消息内容

简单来说:mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。

 

2.7路由模式

2.7.1 模型

通过模型我们可以看到,在发送方推送消息时指定路由键的名称,在接收方的队列必须指定对应的路由键才能获得相应的消息,这就可以让我们选择性的订阅指定消息

2.7.2 生产者

1、定义路由模式下交换机名称

  1. public class RabbitConstant {  
  2.     /** 
  3.      * 简单队列使用 
  4.      */  
  5.     public final static String QUEUE_NAME_SIMPLE = "test_simple_queue";  
  6.     /** 
  7.      * 工作队列使用 
  8.      */  
  9.     public final static String QUEUE_NAME_WORK = "test_work_queue";  
  10.     /** 
  11.      * 发布订阅使用(不路由) 
  12.      */  
  13.     public final static String EXCHANGE_NAME_FANOUT = "test_fanout_exchange";//交换机名称  
  14.     public final static String QUEUE_NAME_EXCHANGE_BIND_EMIL = "test_queue_fanout_emil";//绑定到交换机的队列  
  15.     public final static String QUEUE_NAME_EXCHANGE_BIND_SMS = "test_queue_fanout_sms";//绑定到交换机的队列  
  16.     /** 
  17.      * 发布订阅使用(路由) 
  18.      */  
  19.     public final static String EXCHANGE_NAME_DIRECT = "test_direct_exchange";//交换机名称  
  20.     public final static String QUEUE_NAME_DIRECT_ONE = "test_queue_direct_one";//绑定到交换机队列  
  21.     public final static String QUEUE_NAME_DIRECT_TWO = "test_queue_direct_two";//绑定到交换机对列  
  22. }  

2. 创建生产者

  1. public class send {  
  2.     public static void main(String[] args) throws Exception {  
  3.         //创建连接  
  4.         Connection connection = RabbitConnectionUtil.getConnection();  
  5.         //创建通道  
  6.         Channel channel = connection.createChannel();  
  7.         //声明交换机  
  8.         channel.exchangeDeclare(RabbitConstant.EXCHANGE_NAME_DIRECT,"direct");//分发  
  9.         //向交换机中推送消息  
  10.         String msg = "hello publish subscribe !";  
  11.         String routingKey = "error";//指定路由键  
  12. //        routingKey = "warning";  
  13. //        routingKey = "info";  
  14.         channel.basicPublish(RabbitConstant.EXCHANGE_NAME_DIRECT,routingKey, null, msg.getBytes());  
  15.         System.out.println("send publish subscribe success:" + msg);  
  16.   
  17.         //关闭通道和连接  
  18.         channel.close();  
  19.         connection.close();  
  20.     }  

 

2.7.3 消费者

队列名称的定义在上面生产者那里已经给出

 

消费者1:

  1. public class Recv1 {  
  2.     public static void main(String[] args) throws Exception {  
  3.         //创建连接  
  4.         Connection connection = RabbitConnectionUtil.getConnection();  
  5.         //创建通道  
  6.         final Channel channel = connection.createChannel();  
  7.         //创建队列声明  
  8.         channel.queueDeclare(RabbitConstant.QUEUE_NAME_DIRECT_ONE,false,false,false,null);  
  9.         //绑定队列到交换机  
  10.         String routingKey = "error";//指定路由键  
  11.         channel.queueBind(RabbitConstant.QUEUE_NAME_DIRECT_ONE,RabbitConstant.EXCHANGE_NAME_DIRECT,routingKey);  
  12.         //保证一次只分发一个  
  13.         channel.basicQos(1);  
  14.         //重写handleDelivery 方法,实现阻塞之后获取消息的处理  
  15.         Consumer consumer = new DefaultConsumer(channel) {  
  16.             //获取到达的消息  
  17.             @Override  
  18.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  19.                 super.handleDelivery(consumerTag, envelope, properties, body);  
  20.                 //打印接收到的内容  
  21.                 String msg = new String(body,"utf-8");  
  22.                 System.out.println("【1】Recv success:"+msg);  
  23.                 try {  
  24.                     Thread.sleep(600);  
  25.                 } catch (InterruptedException e) {  
  26.                     e.printStackTrace();  
  27.                 }finally {  
  28.                     System.out.println("【1】 done !");  
  29.                     //手动回执消息  
  30.                     channel.basicAck(envelope.getDeliveryTag(),false);  
  31.                 }  
  32.             }  
  33.         };  
  34.         //监听队列  
  35.         Boolean autoAck = false;//自动应答改为false  
  36.         channel.basicConsume(RabbitConstant.QUEUE_NAME_DIRECT_ONE,autoAck,consumer);  
  37.     }  
  38. }  

 

消费者2:

  1. public class Recv2 {  
  2.     public static void main(String[] args) throws Exception {  
  3.         //创建连接  
  4.         Connection connection = RabbitConnectionUtil.getConnection();  
  5.         //创建通道  
  6.         final Channel channel = connection.createChannel();  
  7.         //创建队列声明  
  8.         channel.queueDeclare(RabbitConstant.QUEUE_NAME_DIRECT_TWO,false,false,false,null);  
  9.         //绑定队列到交换机  
  10.         String routingKey = "error";//指定路由键  
  11.         channel.queueBind(RabbitConstant.QUEUE_NAME_DIRECT_TWO,RabbitConstant.EXCHANGE_NAME_DIRECT,routingKey);  
  12.         routingKey = "info";  
  13.         channel.queueBind(RabbitConstant.QUEUE_NAME_DIRECT_TWO,RabbitConstant.EXCHANGE_NAME_DIRECT,routingKey);  
  14.         routingKey = "warning";  
  15.         channel.queueBind(RabbitConstant.QUEUE_NAME_DIRECT_TWO,RabbitConstant.EXCHANGE_NAME_DIRECT,routingKey);  
  16.         //保证一次只分发一个  
  17.         channel.basicQos(1);  
  18.         //重写handleDelivery 方法,实现阻塞之后获取消息的处理  
  19.         Consumer consumer = new DefaultConsumer(channel) {  
  20.             //获取到达的消息  
  21.             @Override  
  22.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  23.                 super.handleDelivery(consumerTag, envelope, properties, body);  
  24.                 //打印接收到的内容  
  25.                 String msg = new String(body,"utf-8");  
  26.                 System.out.println("【2】Recv success:"+msg);  
  27.                 try {  
  28.                     Thread.sleep(200);  
  29.                 } catch (InterruptedException e) {  
  30.                     e.printStackTrace();  
  31.                 }finally {  
  32.                     System.out.println("【2】 done !");  
  33.                     //手动回执消息  
  34.                     channel.basicAck(envelope.getDeliveryTag(),false);  
  35.                 }  
  36.             }  
  37.         };  
  38.         //监听队列  
  39.         Boolean autoAck = false;//自动应答改为false  
  40.         channel.basicConsume(RabbitConstant.QUEUE_NAME_DIRECT_TWO,autoAck,consumer);  
  41.     }  

 

2.7.4 总结

通过消费者代码我们可以看到,消费者1只能接收到路由键是“error”的消息,消费者2可以接收到路由键是“error”、“info”、“warning”的消息。

我们通过改变生产者中的路由键名称就能指定到哪些消费者可以接收消息了。

 

2.8 主题模式

2.8.1 模型

Topic Exchange-将路由键和某模式进行匹配。此时队列需要绑定要一个模式上,符号"#"匹配一个或多个词,符号匹配不多不少一个词。因此" audit.# "能够匹配到"audit.irs.corporate",但是" audit.* "只会匹配到"auditirs.irs"。

 

2.8.2 生产者

1、定义主题模式下转换器名称

  1. public class RabbitConstant {  
  2.     /** 
  3.      * 简单队列使用 
  4.      */  
  5.     public final static String QUEUE_NAME_SIMPLE = "test_simple_queue";  
  6.     /** 
  7.      * 工作队列使用 
  8.      */  
  9.     public final static String QUEUE_NAME_WORK = "test_work_queue";  
  10.     /** 
  11.      * 发布订阅使用(不处理路由模式【订阅模式】 Fanout) 
  12.      */  
  13.     public final static String EXCHANGE_NAME_FANOUT = "test_fanout_exchange";//交换机名称  
  14.     public final static String QUEUE_NAME_EXCHANGE_BIND_EMIL = "test_queue_fanout_emil";//绑定到交换机的队列  
  15.     public final static String QUEUE_NAME_EXCHANGE_BIND_SMS = "test_queue_fanout_sms";//绑定到交换机的队列  
  16.     /** 
  17.      * 发布订阅使用(处理路由模式 Direct) 
  18.      */  
  19.     public final static String EXCHANGE_NAME_DIRECT = "test_direct_exchange";//交换机名称  
  20.     public final static String QUEUE_NAME_DIRECT_ONE = "test_queue_direct_one";//绑定到交换机队列  
  21.     public final static String QUEUE_NAME_DIRECT_TWO = "test_queue_direct_two";//绑定到交换机对垒  
  22.   
  23.     /** 
  24.      * 发布订阅使用(主题模式 Topic) 
  25.      */  
  26.     public final static String EXCHANGE_NAME_TOPIC = "test_topic_exchange";//交换机名称  
  27.     public final static String QUEUE_NAME_TOPIC_ONE = "test_queue_topic_one";//绑定到交换机队列  
  28.     public final static String QUEUE_NAME_TOPIC_TWO = "test_queue_topic_two";//绑定到交换机对垒  
  29. }  

 

2、创建生产者

  1. public class send {  
  2.     public static void main(String[] args) throws Exception {  
  3.         //创建连接  
  4.         Connection connection = RabbitConnectionUtil.getConnection();  
  5.         //创建通道  
  6.         Channel channel = connection.createChannel();  
  7.         //声明交换机  
  8.         channel.exchangeDeclare(RabbitConstant.EXCHANGE_NAME_TOPIC,"topic");//分发  
  9.         //向交换机中推送消息  
  10.         String msg = "hello publish subscribe !";  
  11.         String routingKey = "goods.add";//指定路由键  
  12. //        routingKey = "goods.update";  
  13.         channel.basicPublish(RabbitConstant.EXCHANGE_NAME_TOPIC,routingKey, null, msg.getBytes());  
  14.         System.out.println("send publish subscribe success:" + msg);  
  15.         //关闭通道和连接  
  16.         channel.close();  
  17.         connection.close();  
  18.     }  

2.8.3 消费者

队列名称的定义在上面生产者那里已经给出

 

消费者1:

  1. public class Recv1 {  
  2.     public static void main(String[] args) throws Exception {  
  3.         //创建连接  
  4.         Connection connection = RabbitConnectionUtil.getConnection();  
  5.         //创建通道  
  6.         final Channel channel = connection.createChannel();  
  7.         //创建队列声明  
  8.         channel.queueDeclare(RabbitConstant.QUEUE_NAME_TOPIC_ONE,false,false,false,null);  
  9.         //绑定队列到交换机  
  10.         String routingKey = "goods.add";//指定路由键  
  11.         channel.queueBind(RabbitConstant.QUEUE_NAME_TOPIC_ONE,RabbitConstant.EXCHANGE_NAME_TOPIC,routingKey);  
  12.         //保证一次只分发一个  
  13.         channel.basicQos(1);  
  14.         //重写handleDelivery 方法,实现阻塞之后获取消息的处理  
  15.         Consumer consumer = new DefaultConsumer(channel) {  
  16.             //获取到达的消息  
  17.             @Override  
  18.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  19.                 super.handleDelivery(consumerTag, envelope, properties, body);  
  20.                 //打印接收到的内容  
  21.                 String msg = new String(body,"utf-8");  
  22.                 System.out.println("【1】Recv success:"+msg);  
  23.                 try {  
  24.                     Thread.sleep(600);  
  25.                 } catch (InterruptedException e) {  
  26.                     e.printStackTrace();  
  27.                 }finally {  
  28.                     System.out.println("【1】 done !");  
  29.                     //手动回执消息  
  30.                     channel.basicAck(envelope.getDeliveryTag(),false);  
  31.                 }  
  32.             }  
  33.         };  
  34.         //监听队列  
  35.         Boolean autoAck = false;//自动应答改为false  
  36.         channel.basicConsume(RabbitConstant.QUEUE_NAME_TOPIC_ONE,autoAck,consumer);  
  37.     }  
  38. }  

 

消费者2:

  1. public class Recv2 {  
  2.     public static void main(String[] args) throws Exception {  
  3.         //创建连接  
  4.         Connection connection = RabbitConnectionUtil.getConnection();  
  5.         //创建通道  
  6.         final Channel channel = connection.createChannel();  
  7.         //创建队列声明  
  8.         channel.queueDeclare(RabbitConstant.QUEUE_NAME_TOPIC_TWO,false,false,false,null);  
  9.         //绑定队列到交换机  
  10.         String routingKey = "goods.#";//指定路由键  
  11.         channel.queueBind(RabbitConstant.QUEUE_NAME_TOPIC_TWO,RabbitConstant.EXCHANGE_NAME_TOPIC,routingKey);  
  12.         //保证一次只分发一个  
  13.         channel.basicQos(1);  
  14.         //重写handleDelivery 方法,实现阻塞之后获取消息的处理  
  15.         Consumer consumer = new DefaultConsumer(channel) {  
  16.             //获取到达的消息  
  17.             @Override  
  18.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  19.                 super.handleDelivery(consumerTag, envelope, properties, body);  
  20.                 //打印接收到的内容  
  21.                 String msg = new String(body,"utf-8");  
  22.                 System.out.println("【2】Recv success:"+msg);  
  23.                 try {  
  24.                     Thread.sleep(200);  
  25.                 } catch (InterruptedException e) {  
  26.                     e.printStackTrace();  
  27.                 }finally {  
  28.                     System.out.println("【2】 done !");  
  29.                     //手动回执消息  
  30.                     channel.basicAck(envelope.getDeliveryTag(),false);  
  31.                 }  
  32.             }  
  33.         };  
  34.         //监听队列  
  35.         Boolean autoAck = false;//自动应答改为false  
  36.         channel.basicConsume(RabbitConstant.QUEUE_NAME_TOPIC_TWO,autoAck,consumer);  
  37.     }  
  38. }  

 

2.8.4 总结

通过消费者代码我们可以看到,消费者1只能接收到路由键是“goods.add”的消息,消费者2路由键为“goods.#”,可以接收到路由键是“goods.add”、“goods.update”、“goods.delete.tt”等含“goods.”前缀的1-n个单词组成的键的消息。

如果路由键是“goods.*”的话,只能匹配到前缀为“goods.”的一个单词的路由键,比如可以匹配“goods.add”,但是不能匹配“goods.delete.tt”。

 

2.9 RabiitMq的消息确认机制(事务+confirm)

在rabbitmq中我们可以通过持久化数据解决rabbitmq服务器异常的数据丢失问题

问题:生产者将消息发送出去之后,消息到底有没有到达rabbitmq服务器,默认的情况是不知道的

两种方式:
AMOP实现了事务机制
Confirm模式

 

2.9.1 事务机制

txSelect  txCommit  txRollback
txSelect:用户将当前channel设置成transation模式,
txCommit:用于提交事务
txRollback:回滚事务

 

2.9.1.1 生产者

  1. public class send {  
  2.     public static void main(String[] args) throws Exception {  
  3.         //创建连接  
  4.         Connection connection = RabbitConnectionUtil.getConnection();  
  5.         //创建通道  
  6.         Channel channel = connection.createChannel();  
  7.         //创建队列声明  
  8.         channel.queueDeclare(RabbitConstant.QUEUE_NAME_SIMPLE,false,false,false,null);  
  9.         try {  
  10.             //开启事务  
  11.             channel.txSelect();  
  12.             //向队列中推送消息  
  13.             String msg = "hello tx message!";  
  14.             channel.basicPublish("",RabbitConstant.QUEUE_NAME_SIMPLE,null,msg.getBytes());  
  15.             //事务提交  
  16.             channel.txCommit();  
  17.             System.out.println("send tx success:"+msg);  
  18.         }catch (Exception e){  
  19.             //事务回滚  
  20.             channel.txRollback();  
  21.             System.out.println("send tx message rollback !");  
  22.         }  
  23.         //关闭通道和连接  
  24.         channel.close();  
  25.         connection.close();  
  26.     }  
  27. }  

2.9.1.2 消费者

  1. public class Recv {  
  2.     public static void main(String[] args) throws Exception {  
  3.         //创建连接  
  4.         Connection connection = RabbitConnectionUtil.getConnection();  
  5.         //创建通道  
  6.         Channel channel = connection.createChannel();  
  7.         //创建队列声明  
  8.         channel.queueDeclare(RabbitConstant.QUEUE_NAME_SIMPLE,false,false,false,null);  
  9.         //重写handleDelivery 方法,实现阻塞之后获取消息的处理  
  10.         Consumer consumer = new DefaultConsumer(channel) {  
  11.             //获取到达的消息  
  12.             @Override  
  13.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  14.                 super.handleDelivery(consumerTag, envelope, properties, body);  
  15.                 //打印接收到的内容  
  16.                 String msg = new String(body,"utf-8");  
  17.                 System.out.println("Recv【tx】 success:"+msg);  
  18.             }  
  19.         };  
  20.         //监听队列  
  21.         channel.basicConsume(RabbitConstant.QUEUE_NAME_SIMPLE,true,consumer);  
  22.     }  
  23. }  

2.9.1.3 总结

在生产者处开启事务处理,如果发生异常则消息将不会进行提交,而是会进行回滚,那么消费者端是接收不到消息的。这个事务处理机制因为连接增加了,降低了消息服务器mq的吞吐量。

 

2.9.2 confirm模式

2.9.2.1 实现原理

生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。

 

Confirm最大的好处是异步处理的,不用等待信道返回确认消息,而继续发送其他消息

如果mq崩溃异常导致数据丢失会返回一条Nack消息,生产者可以利用回调进行处理

 

开启confirm:

channel.confirmSelect();

 

编程模式:

       普通模式:

               发一条:waitForConfirms()

               批量发:waitForConfirms()

       异步confirm模式:

               提供一个回调方法

2.9.2.2 生产者(简单模式)

定义队列名称:

  1. public final static String QUEUE_NAME_SIMPLE_CONFIRM = "test_simple_confirm_queue";  

 

1、Confirm 单条:

  1. public class send {  
  2.     public static void main(String[] args) throws Exception {  
  3.         //创建连接  
  4.         Connection connection = RabbitConnectionUtil.getConnection();  
  5.         //创建通道  
  6.         Channel channel = connection.createChannel();  
  7.         //创建队列声明  
  8.         channel.queueDeclare(RabbitConstant.QUEUE_NAME_SIMPLE_CONFIRM, falsefalsefalse, null);  
  9.         //开启confirm模式  
  10.         channel.confirmSelect();  
  11.         //向队列中推送消息  
  12.         String msg = "hello confirm message!";  
  13.         channel.basicPublish("", RabbitConstant.QUEUE_NAME_SIMPLE_CONFIRM, null, msg.getBytes()); 
  14.          //确认 
  15.         if(!channel.waitForConfirms()){  
  16.             System.out.println("send confirm failed:" + msg);  
  17.         }else {  
  18.             System.out.println("send confirm success:" + msg);  
  19.         }  
  20.         //关闭通道和连接  
  21.         channel.close();  
  22.         connection.close();  
  23.     }  
  24. }

 

2、批量

  1. public class SendBatch {  
  2.     public static void main(String[] args) throws Exception {  
  3.         //创建连接  
  4.         Connection connection = RabbitConnectionUtil.getConnection();  
  5.         //创建通道  
  6.         Channel channel = connection.createChannel();  
  7.         //创建队列声明  
  8.         channel.queueDeclare(RabbitConstant.QUEUE_NAME_SIMPLE_CONFIRM, falsefalsefalse, null);  
  9.         //开启confirm模式  
  10.         channel.confirmSelect();  
  11.         //批量向队列中推送消息  
  12.         String sendMsg = "";  
  13.         for(int i = 0;i<20;i++){  
  14.             sendMsg = "hello confirm message{"+i+"] !";  
  15.             channel.basicPublish("", RabbitConstant.QUEUE_NAME_SIMPLE_CONFIRM, null, sendMsg.getBytes());  
  16.         }  
  17.        //确认
  18.         if(!channel.waitForConfirms()){  
  19.             System.out.println("Send confirm failed:");  
  20.         }else {  
  21.             System.out.println("Send confirm success:");  
  22.         }  
  23.         //关闭通道和连接  
  24.         channel.close();  
  25.         connection.close();  
  26.     }  

 

发完消息在进行确认

 

2.9.2.3 生产者(异步模式)

Channel对象提供的Confirmlistener()回调方法只包含deliveryTag (当前Chanel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每 publish一条数据,集合中元素加1,每回调一次handleAck方法, unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。

 

生产者:

  1. public class Send {  
  2.     public static void main(String[] args) throws Exception {  
  3.         //创建连接  
  4.         Connection connection = RabbitConnectionUtil.getConnection();  
  5.         //创建通道  
  6.         Channel channel = connection.createChannel();  
  7.         //创建队列声明  
  8.         channel.queueDeclare(RabbitConstant.QUEUE_NAME_SIMPLE_CONFIRM, falsefalsefalse, null);  
  9.         //开启confirm模式  
  10.         channel.confirmSelect();  
  11.         //未确认的消息标识  
  12.         final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());  
  13.         channel.addConfirmListener(new ConfirmListener(){  
  14.             //没有问题的handleAck  
  15.             public void handleAck(long deliveryTag, boolean multiple) {  
  16.                 if(multiple){  
  17.                     System.out.println("----handleAck----multiple");  
  18.                     confirmSet.headSet(deliveryTag+1).clear();  
  19.                 }else {  
  20.                     System.out.println("----handleAck----multiple false");  
  21.                     confirmSet.remove(deliveryTag);  
  22.                 }  
  23.             }  
  24.             //handleNack  
  25.             public void handleNack(long deliveryTag, boolean multiple) {  
  26.                 if(multiple){  
  27.                     System.out.println("----handleNack----multiple");  
  28.                     confirmSet.headSet(deliveryTag+1).clear();  
  29.                 }else {  
  30.                     System.out.println("----handleNack----multiple false");  
  31.                     confirmSet.remove(deliveryTag);  
  32.                 }  
  33.             }  
  34.         });  
  35.         //向队列中推送消息  
  36.         for(int i = 0 ;i<50;i++){  
  37.             long nextPublishSeqNo = channel.getNextPublishSeqNo();  
  38.             String msg = "hello confirm message["+i+"] !";  
  39.             channel.basicPublish("", RabbitConstant.QUEUE_NAME_SIMPLE_CONFIRM, null, msg.getBytes());  
  40.             confirmSet.add(nextPublishSeqNo);  
  41.         }  
  42.   
  43.         //关闭通道和连接  
  44.         channel.close();  
  45.         connection.close();  
  46.     }  
  47. }  

 

 

2.9.2.4 消费者

  1. public class Recv {  
  2.     public static void main(String[] args) throws Exception {  
  3.         //创建连接  
  4.         Connection connection = RabbitConnectionUtil.getConnection();  
  5.         //创建通道  
  6.         Channel channel = connection.createChannel();  
  7.         //创建队列声明  
  8.         channel.queueDeclare(RabbitConstant.QUEUE_NAME_SIMPLE_CONFIRM, falsefalsefalse, null);  
  9.         //重写handleDelivery 方法,实现阻塞之后获取消息的处理  
  10.         Consumer consumer = new DefaultConsumer(channel) {  
  11.             //获取到达的消息  
  12.             @Override  
  13.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  14.                 super.handleDelivery(consumerTag, envelope, properties, body);  
  15.                 //打印接收到的内容  
  16.                 String msg = new String(body, "utf-8");  
  17.                 System.out.println("Recv【confirm】 success:" + msg);  
  18.             }  
  19.         };  
  20.         //监听队列  
  21.         channel.basicConsume(RabbitConstant.QUEUE_NAME_SIMPLE_CONFIRM, true, consumer);  
  22.     }  

 

3、Spring集成Rabbitmq

3.1 引入maven依赖

  1. <!-- spring 整合rabbit jar 包-->  
  2. <dependency>  
  3.       <groupId>org.springframework.amqp</groupId>  
  4.       <artifactId>spring-rabbit</artifactId>  
  5.       <version>2.1.8.RELEASE</version>  
  6. </dependency>  

3.2 配置application.xml文件

  1. <beans xmlns="http://www.springframework.org/schema/beans"  
  2.        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  3.        xmlns:rabbit="http://www.springframework.org/schema/rabbit"  
  4.        xsi:schemaLocation="http://www.springframework.org/schema/rabbit  
  5.            https://www.springframework.org/schema/rabbit/spring-rabbit.xsd  
  6.            http://www.springframework.org/schema/beans  
  7.            https://www.springframework.org/schema/beans/spring-beans.xsd">  
  8.   
  9.     <!--创建rabiitmq连接工厂-->  
  10.     <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest" virtual-host="/test_rabbit"/>  
  11.   
  12.     <!--定义Rabiit模板,指定连接工厂和 exchange-->  
  13.     <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange"/>  
  14.   
  15.     <!--mq的管理,包括队列、交换器的声明等-->  
  16.     <rabbit:admin connection-factory="connectionFactory"/>  
  17.   
  18.     <!--定义队列,自动声明-->  
  19.     <rabbit:queue name="myqueue" auto-declare="true" durable="true"/>  
  20.   
  21.     <!--定义交换器 自动声明-->  
  22.     <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">  
  23.         <rabbit:bindings>  
  24.             <rabbit:binding queue="myqueue"/>  
  25.         </rabbit:bindings>  
  26.     </rabbit:fanout-exchange>  
  27.   
  28.     <!--队列监听 指定用哪个类中的哪个方法去监听队列-->  
  29.     <rabbit:listener-container connection-factory="connectionFactory">  
  30.         <rabbit:listener ref="foo" method="listen" queue-names="myqueue"/>  
  31.     </rabbit:listener-container>  
  32.   
  33.     <!--消费者-->  
  34.     <bean id="foo" class="com.rabbit.test.spring.MyConsumer"/>  
  35.   
  36. </beans>  

 

3.3 创建消费者监听

  1. public class MyConsumer {  
  2.   
  3.     //具体执行的业务方法  
  4.     public void listen(String foo){  
  5.         System.out.println("消费者:"+foo);  
  6.     }  
  7.   
  8. }  

3.4 创建生产者发送消息

  1. public class SpringMain {  
  2.     public static void main(String[] args) throws Exception {  
  3.         //获得配置上下文  
  4.         AbstractApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:application.xml");  
  5.         //得到rabbitmq模板  
  6.         RabbitTemplate rabbitTemplate = ctx.getBean(RabbitTemplate.class);  
  7.         //发送消息  
  8.         rabbitTemplate.convertAndSend("hello word !");  
  9.         Thread.sleep(1000);  
  10.         ctx.destroy();  
  11.     }