Controller选举
当添加一个分区或分区增长副本的时候,都要从全部副本中选举一个新的Leader出来。html
Leader若是选举?投票怎么玩?是否是全部的partition副本直接发起投票,开始竞选呢?好比用ZK实现。算法
利用ZK如何实现选举?ZK的什么功能能够感知到节点的变化(增长或减小)?或者说ZK为何能实现加锁和释放锁?apache
用到了3个特色:watch机制;节点不容许重复写入;临时节点。网络
这样实现是比较简单,但也会存在必定弊端。若是分区和副本数量过多,全部的副本都直接选举的话,一旦某个节点增减,就会形成大量watch事件被触发,ZK的负载就会太重。架构
kafka早期的版本就是这样作的,后来换了一种实现方式。分布式
不是全部的repalica都参与leader选举,而是由其中的一个Broker统一来指挥,这个Broker的角色就叫作Controller(控制器)。fetch
就像Redis Sentinel的架构,执行故障转移的时候,必需要先从全部哨兵中选一个负责故障转移的节点同样。kafka 也要先从全部Broker中选出惟一的一个Controller。url
全部Broker会尝试在zookeeper中建立临时节点/controller,只有一个能建立成功(先到先得)。.net
若是Controller挂掉了或者网络出现了问题,ZK上的临时节点会消失。其余的Brokder经过watch监听到Controller下线的消息后,开始竞选新的Controller。方法跟以前仍是同样的,谁先在ZK里写入一个/cotroller节点,谁就成为新的Controller。设计
成为Controller节点以后,它的责任也比其余节点重了几分:
- 监听Broker变化
- 监听Topic变化
- 监听Partition变化
- 获取和管理Broker、Topic、Partition的信息
- 管理Partiontion的主从信息
分区副本Leader选举
Controller肯定之后,就能够开始作分区选主的事了。下面就是找候选人了。显然,每一个replica都想推荐本身,但全部的replica都有竞选资格吗?并非,这里有几个概念。
Assigned-Replicas(AR):一个分区的全部副本。 In-Sync Replicas(ISR):上边全部副本中,跟leader数据保持必定程度同步的。 Out-Sync Replicas(OSR):跟leader同步滞后过多的副本。
AR=ISR + OSR。正常状况下OSR是空的,你们正常同步,AR=ISR。
谁能参加选举?确定不是AR,也不是OR,而是ISR。并且这个ISR不是固定不变的,仍是一个动态列表。
前面说过,若是同步延迟超30秒,就踢出ISR,进入OSR;若是遇上来了就加入ISR。
默认状况下,当leader副本发生故障时,只有在ISR集合中的副本才有资格被选举为新的leader。
若是ISR为空呢?群龙不能无首。在这种状况下,可让ISR以外的副本参与选举。容许ISR以外的副本参与选举,叫作unclean leader election。
unclean.leader.election.enable=false
把这个参数改为true(通常不建议开启,会形成数据丢失)。
Controller有了,候选人也有了ISR,那么根据什么规则肯定leader呢?
咱们首先来看分布式系统中常见的选举协议有哪些(或者说共识算法)?
ZAB(ZK)、Raft(Redis Sentinel)他们都是Paxos算法的变种,核心思想概括起来都是:先到先得、少数服从多数。
但kafka没有用这些方法,而是用了一种本身实现的算法。
为何呢?好比ZAB这种协议,可能会出现脑裂(节点不能互通的时候,出现多个leader)、惊群效应(大量watch事件被触发)。
在文档中有说明:
https://kafka.apachecn.org/documentation.html#design_replicatedlog
提到kafka的选举实现,最相近的是微软的PacificA算法。
在这种算法中,默认是让ISR中第一个replica变成leader。像中国皇帝传位同样,优先传给皇长子。
主从同步
leader肯定以后,客户端的读写只能操做leader节点。follower须要向leader同步数据。
不一样的raplica的offset是不同的,同步到底怎么同步呢?
在以后内容,须要先理解几个概念。
LEO(Log End Offset):下一条等待写入的消息的offset(最新的offset + 1)。
HW(Hign Watermark 高水位):ISR中最小的LEO。Leader会管理全部ISR中最小的LEO为HW。
consumer最多只能消费到HW以前的位置。也就是说,其余副本没有同步过去的消息,是不能被消费的。
kafka为何这么设计?
若是在同步成功以前就被消费了,consumer group 的offset会偏大,若是leader崩溃,中间会丢失消息。
接着再看消息是如何同步的。
Replica 1与Replica2各同步了1条数据,HW推动了1,变成了7,LEO因Replica2推动了1,变成了7。
Replica 1与Replica2各同步了2条数据,HW和LEO重叠,都到了9。
在这须要了解一下,从节点如何与主节点保持同步?
- follower节点会向Leader发送一个fetch请求,leader向follower发送数据后,即须要更新follower的LEO。
- follower接收到数据响应后,依次写入消息而且更新LEO。
- leader更新HW(ISR最小的LEO)
kafka设计了独特的ISR复制,能够在保障数据一致性状况下又能够提供高吞吐量。
Replica故障处理
follower故障
首先follower发生鼓掌,会被先踢出ISR。
follower恢复以后,从哪开始同步数据呢?
假设Replica1宕机。
恢复之后,首先根据以前的记录的HW(6),把高于HW的消息截掉(六、7)。
而后向Leader同步消息。追上Leader以后(30秒),从新加入ISR。
leader故障
还以上图为例,若是图中Leader发生故障。
首先选一个Leader,由于Replica1优先,它将成为Leader。
为了保证数据一致,其余follower须要把高于HW的消息截掉(这里没有消息须要截取)。
而后Replica2同步数据。
此时原Leader中的数据8将丢失。
注意:这种机制只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。