RabbitMQ消息可靠性传输

做者:threedayman 恒生LIGHT云社区html

接着上一讲 消息中间件之RabbitMQ初识,这笔咱们来说讲RabbitMQ中消息丢失的问题。已经怎样在核心业务中避免消息丢失。java

血泪故事:商品购物流程中的发货环节引入了RabbitMQ,某天因为网络抖动致使了生产者的消息没有发送到RabbitMQ中,因为没有作消息的可靠性传输保证,消息丢失,致使一批客户迟迟没收到货物而引起投诉,给公司形成了不小的损失。网络

为了不上述悲剧重演,咱们来了解下在RabbitMQ中咱们须要怎样保证消息不丢失。异步

消息丢失会发生在何时

消息的传输过程大体以下图ide

1622462671(1).jpg

消息丢失可能发生在优化

  • Producer端 发送到RabbitMQ中因为网络异常或者服务异常致使消息发送失败。
  • RabbitMQ服务端 异常或者重启致使消息丢失。
  • Consumer端 接收到消息后,消息处理失败,消息丢失。

固然上一讲中有提到在RabbitMQ,生产者发送消息是和Exchange交互,Exchange根据路由规则投递到具体的Queue中,若是路由规则设置有问题,也会致使消息丢失,但此条不在本文讨论重点。url

Producer 消息可靠性保证

为了不因为网络抖动或者RabbitMQ服务端异常致使消息发送失败的问题。能够在Producer发送消息的使用引入了一个确认机制(ack),服务端接收到消息以后,会返回给Producer一个成功或者失败的确认消息。.net

RabbitMQ提供了两种解决方式:3d

  • 事务机制
  • 发送方确认机制

事务方式,主要方法有如下几个code

  • channel.txSelect() 将当前的channel设置成事务模式。
  • channel.txCommit()用于提交事务。
  • channel.txRollback()用于事务回滚

下面代码是简单示例

try {
channel.txSelect();
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
channel.txCommit();
} catch (Exception e) {
e.printStackTrace();
channel.txRollback();
//发送失败后续处理,重发或者持久化异常消息稍后重试
}

信号的流转过程以下图

1622700218(1).png

图片来源 RabbitMQ实战指南

若是事务可以提交成功,则消息必定到达了RabbitMQ中。

1622700353(1).png

图片来源 RabbitMQ实战指南

事务机制可以解决消息生产者和RabbitMQ之间消息 确认的问题,只有消息成功被RabbitMQ接收,事务才能提交成功。但事务机制是同步阻塞进行的,回大大下降RabbitMQ的吞吐量,RabbitMQ提供了一种改进方案,即发送方确认机制。

发送方确认机制:

  • channel.confirmSelect(); 将通道设置确认机制
  • channel.addConfirmListener() 为通道添加ConfirmListener这个回调接口。
  • com.rabbitmq.client.ConfirmListener#handleAck 回调处理正常被RabbitMQ接收的消息。
  • com.rabbitmq.client.ConfirmListener#handleNack回调处理没有被RabbitMQ正常接收的消息。
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
	public void handleAck(long deliveryTag, boolean multiple) throws IOException {
		if (multiple) {
			confirmSet.headSet(deliveryTag + 1).clear();
		} else {
			confirmSet.remove(deliveryTag);
		}
	}
	public void handleNack(long deliveryTag, boolean multiple) throws IOException {
		System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
		if (multiple) {
			confirmSet.headSet(deliveryTag + 1).clear();
		} else {
			confirmSet.remove(deliveryTag);
		}
		//这里须要添加消息发送失败处理的代码,从新发送或者持久化后补偿。
	}
});
//模拟一直发送消息的场景
while (true) {
	long nextSeqNo = channel.getNextPublishSeqNo();
	channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
	confirmSet.add(nextSeqNo);
}

上面例子演示了异步confirm的形式,在保证生产者消息被RabbitMQ正常接收,又没有同步阻塞致使明显下降RabbitMQ吞吐量的问题。

RabbitMQ端

为避免RabbitMQ服务异常或者重启致使的消息丢失,须要对作持久化操做,将相关信息保存到磁盘上。要保证消息不丢失须要持久化主队列、持久化。exchange不持久化,在RabbitMQ服务重启后,相关的exchange元数据会丢失,不过消息不丢失,但消息不能发送到这个exchange中了。

  • 队列持久化须要在声明队列的时候将durable参数设置为true。(由于消息是存在与队列中,若是队列不持久化,那RabbitMQ重启后,消息将丢失)
  • 消息持久化经过将投递模式设置成2(BasicProperties中的deliveryMode)。
channel.queueDeclare(QUEUE_NAME,true,//durable
                     false,false,null);
channel.basicPublish("",QUEUE_NAME, 
                     MessageProperties.PERSISTENT_TEXT_PLAIN,//具体属性见下面
                     message.getBytes(StandardCharsets.UTF_8));
public static final BasicProperties PERSISTENT_TEXT_PLAIN = 
new BasicProperties("text/plain", 
					null, 
					null, 
					2, //deliveryMode
					0, null, null, null, 
					null, null, null, null, null, null);

Consumer端

为保证Consumer端不因消费处理异常或消费者应用重启致使消息丢失。咱们须要以下操做

  • 关闭默认的自动确认。设置为手动确认模式。

手动确认模式:RabbitMQ会等待消费者回复确认信号后才从删除消息。

自动确认模式(默认):RabbitMQ会自动把发出去的消息置为确认,而后删除,无论消费者有没有真正消费到这些消息。

当设置为手动确认模式,对于RabbitMQ服务端而言队列中的消息分为了两种

  • Ready:等待投递给消费者的消息。
  • Unacked:已经投递给消费者,但尚未收到消费者确认新号的消息。

对于Unacked消息,会出现下面几种状况:

  • RabbitMQ收到持有消息的消费者的ack信号,RabbitMQ服务端将会删除该消息。
  • RabbitMQ服务端收到持有消息的消费者nack/reject信号,requeue参数为true,RabbitMQ会从新将这条消息存入队列。
  • RabbitMQ服务端收到持有消息的消费者nack/reject信号,requeue参数为false,若是队列配置了死信队列,则消息进入死信队列,若是没有配置死信队列,则消息被RabbitMQ从队列中删除。
  • RabbitMQ服务端没有收到消息持有消费者的确认信号,且消费此消息的消费者没有断开链接,则服务端会一直等待,没有超时时间。
  • RabbitMQ服务端没有收到消息持有消费者的确认信号,且消费此消息的消费者已经断开链接,RabbitMQ会安排该消息从新进入队列。

消息拒绝可使用Channel类中的basicReject或者basicNack方法,下面咱们来看下他们之间的差别。

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

void basicReject(long deliveryTag, boolean requeue) throws IOException;
  • deliveryTag:64位的长整型值,做为消息的编号。
  • requeue:是否重入队列配置项。
  • multiple:是否批量处理未被当前消费者确认的消息。

basicReject一次只能拒绝一条消息。

basicNack当multiple为false时一次拒绝一条编号为deliveryTag消息,效果和basicReject同样。当multiple为true时表示拒绝deliveryTag编号以前全部未被当前消费者确认的消息。

咱们来看一个代码示例:

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             long deliveryTag = envelope.getDeliveryTag();
			 try{
				//消息处理业务逻辑处理
				channel.basicAck(deliveryTag, false);
			 }catch(Exception e){
                 //处理失败处理逻辑
				channel.basicReject(deliveryTag, false);
			 }
         }
     });

经过手动确认模式,RabbitMQ只有在收到持有消息的Consumer的应答信号时,才会删除掉消息,保证消息不因Consumer应用异常而致使消息丢失的问题发生。

看了消费端保证消息不丢失的方案,有小伙伴会有疑问,假如RabbitMQ已经把消息投递给了Consumer,Consumer正常的处理了消息,可是因为网络抖动等缘由,RabbitMQ没有收到Consumer的ack消息,且认为Consumer已经断开链接,那么RabbitMQ会从新将消息放入队列,并投递给消费者。这样会致使某些消息重复投递给Consumer的问题产生。

在此种方案下RabbitMQ确实有可能产生重复消息的问题,咱们将在接下来的文章中去处理这个问题。

该方案只保证消息至少一次投递(At least Once)

死信队列

DLX,全名Dead-Letter-Exchange,死信交换器。当一个消息变为死信(dead message)后,可以被从新DLX上,绑定DLX的队列就是死信队列。

消息变成私信有如下几种可能

  • 消息被拒绝(basicNack/basicReject),而且设置requeue参数为false;
  • 消息过时。
  • 队列超过最大长度。

下面经过一个简化的代码示例来演示下死信队列的使用。详细说明见注释

//声明交换器
channe1.exchangeDeclare("exchange.dlx","direct ",true);
channe1.exchangeDeclare( "exchange.normal "," fanout ",true);
Map<String , Object> args = new HashMap<String, Object>( );
//设置消息超时时间
args.put("x-message-ttl " , 10000);
//经过x-dead-letter-exchange参数来执行DLX
args.put( "x-dead-letter-exchange ","exchange.dlx");
//为DLX指定路由键
args.put( "x-dead-letter-routing-key"," routingkey");
channe1.queueDec1are( "queue.norma1 ",true,fa1se,fa1se,args);
channe1.queueBind( "queue.normal ","exchange .normal", "");
channe1.queueDec1are( "queue.d1x ", true , false , false , null) ;
channe1.queueBind( "queue.dlx","exchange.dlx ", routingkey");
channe1.basicPublish( "exchange.normal" , "rk" ,
MessageProperties.PERSISTENT_TEXT_PLAIN,"dlx".getBytes()) ;

消息流程见下图

1622719517(1).jpg

对于RabbitMQ来讲,经过分析死信队列中的消息,能够用于改善和优化系统。

总结:消息丢失可能发生在生产端、服务端、消费端。对于重要业务咱们能够经过上面介绍的方式来确保消息不丢失。你们也能够留言讨论下,在使用RabbitMQ过程当中遇到过哪些坑。

参考文档

  1. RabbitMQ实战指南
  2. https://www.rabbitmq.com/reliability.html#what-can-fail