Kafka精通:Producer发送数据丢失、重复、乱序缘由及解决方案

生产环境对于生产者来讲,Kafka集群发送消息常常会遇到消息丢失、重复、乱序等问题,下面咱们来说解一下出现这些问题的缘由及解决方案。node


1.咱们知道Kafka为保障数据的可靠性,采用了多副本的存储机制服务器


    假设一个Topic拆分为了3个Partition,分别是PartitionA,PartitonB,PartitionC,此时每一个Partition都有2个副本。好比PartitionA有一个副本是leader,另一个副本是follower,leader和follower两个副本是分布在不一样机器上的。网络

    通常生产环境咱们都会指定Topic的数据有3个副本,即便一台broker挂掉之后,数据也不会完全丢失,由于其余broker还存在另外的副本数据。并发


2.副本数据是如何进行同步的呢?异步


    对于多个副本的topic来讲,只有分区leader提供读写服务的,follower只是不停的尝试从leader拉取最新的数据到本地,不提供读写服务,跟leader数据保持同步的数据有哪些呢?就是经过ISRl来管理的。async

    ISR全称是“In-Sync Replicas”,就是能跟首领副本基本保持一致的跟随副本,若是同步的速度太慢的话,就会被踢出ISR副本。ide


3.说一下Kafka的Producer端消息传递语义,由参数"acks"控制,分三种:
ui


a.acks=allspa

    意味着当Producer发送消息时,leader接收到消息以后,还必需要求ISR列表里跟leader保持同步的那些follower都要把消息同步过去,才能认为这条消息是写入成功了,这种效率最低,可是可靠性最高。orm


b.acks=1:

    意味着当Producer发送消息时,leader接收到消息并且写入本地磁盘了,就认为成功了,无论他其余的follower有没有同步过去这条消息了,acks默认值为1


c.acks=0:

    意味着当Producer发送消息时,producer发送一次就再也不发送了,不论是否发送成功,这种状况下没有任何的确认,可能存在消息的丢失;这种效率最高,可是可靠性最低



4.Producer端引发数据丢失、重复、乱序的缘由及解决方案


1).Producer发送数据分为同步、异步两种模式,由参数producer.type控制,通常生产采用异步模式批量发送,提升Kafka系统的吞吐率。


a同步模式 producer.type = sync:

    当acks=0,不进行消息接收的确认,那么当网络异常时,就会形成数据丢失,通常生产不建议设置为0;

    当acks=1,在只有leader接收成功并发送ack确认后,leader宕机,副本没有同步完成,也会形成数据丢失

    上面两种数据丢失,咱们能够设置acks=all,保证produce 写入全部副本算成功,效率比较低。

 producer.type = sync  request.required.acks=all



b.异步模式 producer.type = async:

    异步模式下的有个buffer,经过buffer来进行控制数据的发送,有两个值来进行控制,时间阈值与消息的数量阈值,若是buffer满了数据尚未发送出去,若是设置的是当即清理模式,风险很大,容易形成数据丢失。通常设置为阻塞模式:queue.enqueue.timeout.ms = -1表示后台消息queue积压到上限后将一直阻塞,直到queue空间释放;

producer.type = asyncrequest.required.acks=1queue.buffering.max.ms=6000queue.buffering.max.messages=10000queue.enqueue.timeout.ms = -1batch.num.messages=500


3).acks = all时,数据发送到 leader 后 ,数据发送到 leader 后 ,部分 ISR 的副本同步,leader 此时挂掉。好比 follower1 和 follower2 都有可能变成新的 leader, producer 端会获得返回异常,producer 端会从新发送数据,可能会形成数据重复,这种可经过幂等性和事务解决,后续会讲;


4).acks=all时,当isr列表为空,若是unclean.leader.election.enable为true,则会选择其余存活的副本做为新的leader,也会存在消息丢失的问题

可经过设置参数:

unclean.leader.election.enable=false

    这个参数在0.11.0以前其默认值为true,而以后的版本其默认值为false。意思是,在leader选举时是否在没有活着的ISR副本时从OSR中的最先follower选举,若是为true可能会形成数据的丢失;


5).异步发送消息时,有两条数据数据准备发送到相同的Partition,第一条消息写入失败,第二条消息写入失败,通过重试后第一批次写入成功,这时就会形成发送数据的乱序,这种状况咱们能够经过设置参数进行限制:

参数:max.in.flight.requests.per.connection

    表示请求队列大小,默认5,请求队列中存放的是在发送途中的请求,包括:正在发送的请求和已经发送的但尚未接收到response的请求;请求队列满了,发送消息将会发生阻塞。也就是发往同一个node的最大未响应请求。设置此值是1,表示kafka broker在响应请求以前client不能再向同一个broker发送请求,这样便能够避免消息乱序

    

    这是我总结的几种缘由和处理方式,可能不全,后面若是有了新的领悟,我会进行更新和补充。


5.未完待续


    经过4的分析发现kafka在两端的默认配置都是at least once,可能重复,经过配置也不能作到exactly once,好像kafka的消息必定会丢失或者重复,在kafka 0.11.0.0版本以后,开始引入了幂等性和事务机制来解决上述问题,下一篇文章我将详细讲解幂等性原理及实现剖析。




扩充知识点:

1.Kafka发送数据过快,致使服务器网卡流量暴增。或磁盘过忙,出现丢包,形成。这时候咱们采起如下措施:

     a.首先,对kafka进行限速;

     b.其次启用重试机制,使重试间隔变长;

     c.Kafka设置ack=all,即须要处于ISR(副本列表)的分区都确认,才算发送成功。