RabbitMQ - 消息确认

RabbitMQ - 队列中提到,接收消息的时候,有两个方式,一个是consume,一个是get,这两个方法都有一个autoAck的参数。当咱们设置为true的时候,说明消费者会经过AMQP显示的向rabbitmq发送一个确认,rabbitmq自动视其确认了消息,而后把消息从队列中删除。下面用consume的方式作些例子来理解autoAck的参数设置。web

String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;

GetResponse basicGet(String queue, boolean autoAck) throws IOException;

不确认

先往ack队列发送5条数据,能够看到ready是5,total是5。
image.png
运行如下代码,autoAck设置为false,且不对消息确认。segmentfault

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个链接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 建立一个与rabbitmq服务器的链接
    Connection connection = factory.newConnection();
    // 建立一个Channel
    Channel channel = connection.createChannel();
    // 经过Channel定义队列
    channel.queueDeclare("ack", false, false, false, null);
    // 异步回调处理
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("ack Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag());
        //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    };
    // 接收消息
    channel.basicConsume("ack", false, deliverCallback, consumerTag -> {
    });
}

运行结果以下,打印了5条消息:
image.png
在web控制台能够看出,ready是0,unacked是5,即未确认的消息数是5条。
image.png
把应用中止掉,即关闭消费者和rabbitmq的链接,web控制台以下,unacked的5条数据回到了ready。
image.png
综上,当autoAck为false时,消息分为两个部分,一个是未投放放消费者的(ready),一个是投放给消费者但未确认的。若是未确认信息的消费者断开了链接,这部分消息会回到ready从新投递给消费者,确保了消息的可靠性。须要注意的是,若是消费者一直没有断开链接,也没有进行确认,那这个消息会一直等待确认中。安全

确认

确认有两种,一个是自动确认,一个是手动确认。自动确认的话,就是把autoAck设置为true。
当rabbitmq向消费者传递消息的时候,会带有一个deliveryTag的传递标识,标记唯一地标识信道上的传递,交付标记的做用域是每一个信道,因此必须在接收消息的信道上进行确认。交付标记是递增的正整数,因此咱们看到是1,2,3这样的递增数字。
上面例子中,有注释了一行代码,就是用来手动确认的,第一个参数就是传递标记,第二个参数是是否批量确认。
上面的打印结果输出了deliveryTag的值,从1到5。把注释删了再运行后,能够看到rabbitmq把确认后的消息删除。服务器

void basicAck(long deliveryTag, boolean multiple) throws IOException;

批量确认

basicAck方法中,有个参数是multiple,是用来批量确认的。当设置为true的时候,RabbitMQ将确认全部未完成的传递标记,包括确认中指定的标记。与其余与确认相关的内容同样,这是按每一个信道肯定范围的。好比收到标记为一、2的没确认,标记为3的确认,那前面两个也会一块儿确认。若是multiple设置为false,则仅确认当前的消息。
咱们看看下面的例子,每处理三个消息确认一次。异步

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个链接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 建立一个与rabbitmq服务器的链接
    Connection connection = factory.newConnection();
    // 建立一个Channel
    Channel channel = connection.createChannel();
    // 经过Channel定义队列
    channel.queueDeclare("multiple", false, false, false, null);
    // 异步回调处理
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("multiple Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag());
        if (delivery.getEnvelope().getDeliveryTag() % 3 == 0) {
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
        }
    };
    // 接收消息
    channel.basicConsume("multiple", false, deliverCallback, consumerTag -> {
    });
}

启动后,发送两个消息,能够看到打印了两次
image.png
控制台显示未确认2个
image.png
再发送一条数据,打印了第三个
image.png
web控制台看出,已经确认并删除了队列的消息
image.pngspa

使用哪一种确认方式

自动确认,这种模式一般被称为“发了就忘”,当消费端处理异常时,则服务器发送的消息将得不到正确的处理。所以,自动消息确认应该被认为是不安全的。
手动确认模式中,一般使用消息预取,它限制了通道上未完成(“进行中”)交付的数量。可是,对于自动确认,没有这样的限制,因此消费者有可能因为处理消息太慢,致使内存积压、堆耗尽,致使程序没法运行。所以,自动确认模式仅适用于可以高效、稳定地处理配送的消费者。code