RabbitMQ - 队列中提到,接收消息的时候,有两个方式,一个是consume,一个是get,这两个方法都有一个autoAck的参数。当咱们设置为true的时候,说明消费者会经过AMQP显示的向rabbitmq发送一个确认,rabbitmq自动视其确认了消息,而后把消息从队列中删除。下面用consume的方式作些例子来理解autoAck的参数设置。web
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException; GetResponse basicGet(String queue, boolean autoAck) throws IOException;
先往ack队列发送5条数据,能够看到ready是5,total是5。
运行如下代码,autoAck设置为false,且不对消息确认。segmentfault
public static void main(String[] args) throws IOException, TimeoutException { // 声明一个链接工厂 ConnectionFactory factory = new ConnectionFactory(); // 建立一个与rabbitmq服务器的链接 Connection connection = factory.newConnection(); // 建立一个Channel Channel channel = connection.createChannel(); // 经过Channel定义队列 channel.queueDeclare("ack", false, false, false, null); // 异步回调处理 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("ack Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; // 接收消息 channel.basicConsume("ack", false, deliverCallback, consumerTag -> { }); }
运行结果以下,打印了5条消息:
在web控制台能够看出,ready是0,unacked是5,即未确认的消息数是5条。
把应用中止掉,即关闭消费者和rabbitmq的链接,web控制台以下,unacked的5条数据回到了ready。
综上,当autoAck为false时,消息分为两个部分,一个是未投放放消费者的(ready),一个是投放给消费者但未确认的。若是未确认信息的消费者断开了链接,这部分消息会回到ready从新投递给消费者,确保了消息的可靠性。须要注意的是,若是消费者一直没有断开链接,也没有进行确认,那这个消息会一直等待确认中。安全
确认有两种,一个是自动确认,一个是手动确认。自动确认的话,就是把autoAck设置为true。
当rabbitmq向消费者传递消息的时候,会带有一个deliveryTag的传递标识,标记唯一地标识信道上的传递,交付标记的做用域是每一个信道,因此必须在接收消息的信道上进行确认。交付标记是递增的正整数,因此咱们看到是1,2,3这样的递增数字。
上面例子中,有注释了一行代码,就是用来手动确认的,第一个参数就是传递标记,第二个参数是是否批量确认。
上面的打印结果输出了deliveryTag的值,从1到5。把注释删了再运行后,能够看到rabbitmq把确认后的消息删除。服务器
void basicAck(long deliveryTag, boolean multiple) throws IOException;
basicAck方法中,有个参数是multiple,是用来批量确认的。当设置为true的时候,RabbitMQ将确认全部未完成的传递标记,包括确认中指定的标记。与其余与确认相关的内容同样,这是按每一个信道肯定范围的。好比收到标记为一、2的没确认,标记为3的确认,那前面两个也会一块儿确认。若是multiple设置为false,则仅确认当前的消息。
咱们看看下面的例子,每处理三个消息确认一次。异步
public static void main(String[] args) throws IOException, TimeoutException { // 声明一个链接工厂 ConnectionFactory factory = new ConnectionFactory(); // 建立一个与rabbitmq服务器的链接 Connection connection = factory.newConnection(); // 建立一个Channel Channel channel = connection.createChannel(); // 经过Channel定义队列 channel.queueDeclare("multiple", false, false, false, null); // 异步回调处理 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("multiple Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); if (delivery.getEnvelope().getDeliveryTag() % 3 == 0) { channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true); } }; // 接收消息 channel.basicConsume("multiple", false, deliverCallback, consumerTag -> { }); }
启动后,发送两个消息,能够看到打印了两次
控制台显示未确认2个
再发送一条数据,打印了第三个
web控制台看出,已经确认并删除了队列的消息
spa
自动确认,这种模式一般被称为“发了就忘”,当消费端处理异常时,则服务器发送的消息将得不到正确的处理。所以,自动消息确认应该被认为是不安全的。
手动确认模式中,一般使用消息预取,它限制了通道上未完成(“进行中”)交付的数量。可是,对于自动确认,没有这样的限制,因此消费者有可能因为处理消息太慢,致使内存积压、堆耗尽,致使程序没法运行。所以,自动确认模式仅适用于可以高效、稳定地处理配送的消费者。code