Kafka底层原理及面试题(三)

3. Kafka原理

3.1 Leader 和 Follower

​ 在Kafka中,每个topic都可以配置多个分区以及多个副本。每个分区都有一个leader以及0个或者多个follower。在创建topic时,Kafka会将每个分区的leader均匀地分配在每个broker上。我们正常使用Kafka是感觉不到leader、follower的存在的。

​ 但其实,所有的读写操作都是由leader处理的,而所有的follower都是复制leader的日志数据文件,如果leader出现故障时,follower就会选举为leader。

​ 所以说

  • Kafka中的leader负责读写操作,而follower只负责副本数据的同步。
  • 如果leader出现故障,其他follower会被重新选举为leader
  • follower像一个consumer一样,拉取leader对应分区的数据,并保存在日志数据文件中.

leader职责 读写数据

follower职责 同步数据、参与选举(leader crash之后,会选举一个follower重新成为分区的leader)

3.1.1 Kafka中的leader和follower 与zookeeper的区别

答:

  • zookeeper的leader负责读、写;follower可以读取,但不能写

  • Kafka的leader 负责读、写;follower不能读写数据,而是作为每个消费者消费的数据一致的。

  • Kafka的一个topic可以有多个分区,一样可以实现数据操作的负载均衡。

3.2 在kafka中ISR是什么?

在zk中会保存AR(Assigned Replicas)列表,其中包含了分区所有的副本,其中 AR = ISR+OSR

  • ISR(in sync replica):是kafka动态维护的一组同步副本,在ISR中有成员存活时,只有这个组的成员才可以成为leader,内部保存的为每次提交信息时必须同步的副本(acks = all时),每当leader挂掉时,在ISR集合中选举出一个follower作为leader提供服务,当ISR中的副本被认为坏掉的时候,会被踢出ISR,当重新跟上leader的消息数据时,重新进入ISR。可以理解为在follower集合中,有多少结点是存活的。

    image-20201103203410023

  • OSR(out sync replica): 保存的副本不必保证必须同步完成才进行确认,OSR内的副本是否同步了leader的数据,不影响数据的提交,OSR内的follower尽力的去同步leader,可能数据版本会落后。

3.3 Kafka Controller介绍

在Kafka早期版本,对于分区和副本的状态的管理依赖于zookeeper的Watcher和队列:每一个broker都会在zookeeper注册Watcher,所以zookeeper就会出现大量的Watcher, 如果宕机的broker上的partition比较多,会造成多个Watcher触发,造成集群内大规模调整;每一个replica都要去再次zookeeper上注册监视器,当集群规模很大的时候,zookeeper负担很重。这种设计很容易出现脑裂和羊群效应以及zookeeper集群过载。

新版本该变了这种设计,使用KafkaController,只有Kafka Controller Leader会向zookeeper上注册Watcher,其他broker几乎不用监听zookeeper的状态变化。

而Kafka集群中多个broker,每次只有一个会被选举为controller leader,负责管理整个集群中分区和副本的状态,比如partition的leader 副本故障,由controller 负责为该partition重新选举新的leader 副本;当检测到ISR列表发生变化,有controller通知集群中所有broker更新其MetadataCache信息;或者增加某个topic分区的时候也会由controller管理分区的重新分配工作。

当broker启动的时候,都会创建KafkaController对象,但是集群中只能有一个leader对外提供服务,这些每个节点上的KafkaController会在指定的zookeeper路径下创建临时节点,只有第一个成功创建的节点的KafkaController才可以成为leader,其余的都是follower。当leader故障后,所有的follower会收到通知,再次竞争在该路径下创建节点从而选举新的leader

  • controller 选举 partition leader 的步骤
    1. 所有的partition的leader选举都是由controller决定的
    2. controller会将leader的改变直接通过RPC(远程过程调用)的方式通知需要为此做出响应的Broker
    3. controller会读取到当前分区的ISR,只要 有一个Replica还幸存,就选择其中一个作为leader,否则任意选择一个Replica作为leader。
    4. 如果该partition的所有Replica都已经宕机,则新的leader为-1

3.4 问:为什么不能通过ZK的方式选举partition的leader?

  1. Kafka集群如果业务很多的情况下,会有很多partition。
  2. 加入某个broker宕机,就会出现很多partition都需要重新选举leader。
  3. 如果使用zookeeper选举leader,就会给leader带来家巨大的压力,所以Kafka的leader的选举不能使用ZK来实现。

3.5 问:对于kafka节点活着的条件是什么?

  • 第一点:一个节点必须维持和zk的会话,通过zk的心跳检测实现
  • 第二点:如果节点是一个slave也就是复制节点,那么他必须复制leader节点不能太落后。这里的落后可以指两种情况
    • 1:数据复制落后,slave节点和leader节点的数据相差较大,这种情况有一个缺点,在生产者突然发送大量消息导致网络堵塞后,大量的slav复制受阻,导致数据复制落后被大量的踢出ISR。
    • 2:时间相差过大,指的是slave向leader请求复制的时间距离上次请求相隔时间过大。通过配置replica.lag.time.max就可以配置这个时间参数。这种方式解决了上述第一种方式导致的问题。

3.6 问:什么原因导致副本与leader不同步的呢?

导致副本follower和leader不同步的原因主要有3种情况:

  1. 慢副本:在一定周期时间内follower不能追赶上leader。最常见的原因之一是I / O瓶颈导致follower追加复制消息速度慢于从leader拉取速度。
  2. 卡住副本:在一定周期时间内follower停止从leader拉取请求。follower replica卡住了是由于GC暂停或follower失效或死亡。
  3. 新启动副本:当用户给主题增加副本因子时,新的follower不在同步副本列表中,直到他们完全赶上了leader日志。

一个partition的follower落后于leader足够多时,被认为不在同步副本列表或处于滞后状态。

正如上述所说,现在kafka判定落后有两种,副本滞后判断依据是副本落后于leader最大消息数量(replica.lag.max.messages)或replicas响应partition leader的最长等待时间(replica.lag.time.max.ms)。前者是用来检测缓慢的副本,而后者是用来检测失效或死亡的副本

3.7 问:Data Replication何时Commit?

何时提交,这与配置选项acks相关,

同步复制(acks=all): 只有所有的follower把数据拿过去后才commit,一致性好,可用性不高。
异步复制(acks=1): 只要leader拿到数据立即commit,等follower慢慢去复制,可用性高,立即返回,一致性差一些。
**Commit:**是指leader告诉客户端,这条数据写成功了。kafka尽量保证commit后立即leader挂掉,其他flower都有该条数据。

kafka不是完全同步,也不是完全异步,是一种ISR机制:

  1. leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica),每个Partition都会有一个ISR,而且是由leader动态维护

  2. 如果一个flower比一个leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其重ISR中移除

  3. 当ISR中所有Replica都向Leader发送ACK时,leader才commit

既然所有Replica都向Leader发送ACK时,leader才commit,那么flower怎么会leader落后太多?
producer往kafka中发送数据,不仅可以一次发送一条数据,还可以发送message的数组;批量发送,同步的时候批量发送,异步的时候本身就是就是批量;底层会有队列缓存起来,批量发送,对应broker而言,就会收到很多数据(假设1000),这时候leader发现自己有1000条数据,flower只有500条数据,落后了500条数据,就把它从ISR中移除出去,这时候发现其他的flower与他的差距都很小,就等待;如果因为内存等原因,差距很大,就把它从ISR中移除出去。

3.8 问:Data Replication如何处理Replica全部宕机

面对replica宕机时,Kafka的处理方式主要有两种:

1、等待ISR中任一Replica恢复,并选它为Leader

  1. 等待时间较长,降低可用性
  2. 或ISR中的所有Replica都无法恢复或者数据丢失,则该Partition将永不可用

2、选择第一个恢复的Replica为新的Leader,无论它是否在ISR中

  1. 并未包含所有已被之前Leader Commit过的消息,因此会造成数据丢失
  2. 可用性较高

3.9 生产者写入数据的流程

image-20201104151220747

  1. 生产者先从 zookeeper 的 “/brokers/topics/主题名/partitions/分区名/state”节点找到该partition的leader;
  2. 生产者在ZK中找到该ID,然后找到对应的brokers;
  3. broker进程上的leader将消息写入到本地log中;
  4. follower从leader上拉去消息,写入到本地log,并向leader 发送 ACK;
  5. leader收到所有的 ISR 中的Replica 的ACK后,并向生产者返回ACK。

3.10 消费者消费数据的流程

Kafka采用的是拉取模型,由消费者自己记录消费状态,每个消费者相互独立地顺序拉去每个分区的消息。消费者可以按照任意的顺序消费消息。比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;或者直接跳转到最近的为位置,从当前的时刻开始消费。

image-20201104152532203

  1. 因为broker的节点都是在zookeeper上注册的,所以第一步依然是从zookeeper中获取partition对应的leader位置;因为是第一次读取数据,所以需要配置好是从头开始读取数据还是从上一次的位置进行读取,即配置好offset**(默认是从ZK中获取上次消费的offset)**,拿到了offset就可以从指定的位置开始读取数据。
  2. 找到分区的leader,从offset开始往后顺序拉取数据
  3. 提交offset,保存在ZK中,如果ZK没有保存就会导致下次的重复消费。提交offset通常有两种方式:1.手动提交,放到事务中提交;2.自动提交,每个多少秒提交一次offset。