kafka原理

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是能够实时的处理大量数据以知足各类需求场景:好比基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

**1.前言**
消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务技术水平和最关键指标之一。下面将从Kafka文件存储机制和物理结构角度,分析Kafka是如何实现高效文件存储,及实际应用效果。
1.1  Kafka的特性:
- 高吞吐量、低延迟:kafka每秒能够处理几十万条消息,它的延迟最低只有几毫秒,每一个topic能够分多个partition, consumer group 对partition进行consume操做。
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,而且支持数据备份防止数据丢失
- 容错性:容许集群中节点失败(若副本数量为n,则容许n-1个节点失败)
- 高并发:支持数千个客户端同时读写

1.2   Kafka的使用场景:
- 日志收集:一个公司能够用Kafka能够收集各类服务的log,经过kafka以统一接口服务的方式开放给各类consumer,例如hadoop、Hbase、Solr等。
- 消息系统:解耦和生产者和消费者、缓存消息等。
- 用户活动跟踪:Kafka常常被用来记录web用户或者app用户的各类活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,而后订阅者经过订阅这些topic来作实时的监控分析,或者装载到hadoop、数据仓库中作离线分析和挖掘。
- 运营指标:Kafka也常常用来记录运营监控数据。包括收集各类分布式应用的数据,生产各类操做的集中反馈,好比报警和报告。
- 流式处理:好比spark streaming和storm

1.3  Kakfa的设计思想
- Kakfa Broker Leader的选举:
  Kakfa Broker集群受Zookeeper管理。全部的Kafka Broker节点一块儿去Zookeeper上注册一个临时节点,由于只有一个Kafka Broker会注册成功,其余的都会失败,因此这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其余的Kafka broker叫Kafka Broker follower。(这个过程叫Controller在ZooKeeper注册Watch)。
  这个Controller会监听其余的Kafka Broker的全部信息,若是这个kafka broker controller宕机了,在zookeeper上面的那个临时节点就会消失,此时全部的kafka broker又会一块儿去Zookeeper上注册一个临时节点,由于只有一个Kafka Broker会注册成功,其余的都会失败,因此这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其余的Kafka broker叫Kafka Broker follower。
  例如:一旦有一个broker宕机了,这个kafka broker controller会读取该宕机broker上全部的partition在zookeeper上的状态,并选取ISR列表中的一个replica做为partition leader(若是ISR列表中的replica全挂,选一个幸存的replica做为leader; 若是该partition的全部的replica都宕机了,则将新的leader设置为-1,等待恢复,等待ISR中的任一个Replica“活”过来,而且选它做为Leader;或选择第一个“活”过来的Replica(不必定是ISR中的)做为Leader),这个broker宕机的事情,kafka controller也会通知zookeeper,zookeeper就会通知其余的kafka broker。

- Consumergroup:
  各个consumer(consumer 线程)能够组成一个组(Consumer group ),partition中的每一个message只能被组(Consumer group )中的一个consumer(consumer 线程)消费,若是一个message能够被多个consumer(consumer 线程)消费的话,那么这些consumer必须在不一样的组。
  Kafka不支持一个partition中的message由两个或两个以上的同一个consumer group下的consumer thread来处理,除非再启动一个新的consumer group。因此若是想同时对一个topic作消费的话,启动多个consumer group就能够了,可是要注意的是,这里的多个consumer的消费都必须是顺序读取partition里面的message,新启动的consumer默认从partition队列最头端最新的地方开始阻塞的读message。
  当启动一个consumer group去消费一个topic的时候,不管topic里面有多个少个partition,不管咱们consumer group里面配置了多少个consumer thread,这个consumer group下面的全部consumer thread必定会消费所有的partition;即使这个consumer group下只有一个consumer thread,那么这个consumer thread也会去消费全部的partition。所以,最优的设计就是,consumer group下的consumer thread的数量等于partition数量,这样效率是最高的。
  同一partition的一条message只能被同一个Consumer Group内的一个Consumer消费。不可以一个consumer group的多个consumer同时消费一个partition。

- Consumer Rebalance的触发条件:
  (1): Consumer增长或删除会触发 Consumer Group的Rebalance(2)Broker的增长或者减小都会触发 Consumer Rebalance
  
- Consumer:
  Consumer处理partition里面的message的时候是o(1)顺序读取的。因此必须维护着上一次读到哪里的offsite信息。high level API,offset存于Zookeeper中,low level API的offset由本身维护。通常来讲都是使用high level api的。
  Consumer的delivery gurarantee,默认是读完message先commmit再处理message,autocommit默认是true,这时候先commit就会更新offsite+1,一旦处理失败,offsite已经+1,这个时候就会丢message;也能够配置成读完消息处理再commit,这种状况下consumer端的响应就会比较慢的,须要等处理完才行。
  若是producer的流量增大,当前的topic的parition数量=consumer数量,这时候的应对方式就是很想扩展:增长topic下的partition,同时增长这个consumer group下的consumer。
![image.png](/img/bVcSl2y)

- Delivery Mode:
  Kafka producer 发送message不用维护message的offsite信息,由于这个时候,offsite就至关于一个自增id,producer就尽管发送message就行了。
  可是Consumer端是须要维护这个partition当前消费到哪一个message的offsite信息的,这个offsite信息,high level api是维护在Zookeeper上,low level api是本身的程序维护。
  当使用high level api的时候,先拿message处理,再定时自动commit offsite+1(也能够改为手动), 而且kakfa处理message是没有锁操做的。所以若是处理message失败,此时尚未commit offsite+1,当consumer thread重启后会重复消费这个message。可是做为高吞吐量高并发的实时处理系统,at least once的状况下,至少一次会被处理到,是能够容忍的。若是没法容忍,就得使用low level api来本身程序维护这个offsite信息,那么想何时commit offsite+1就本身搞定了。
  
  - Topic & Partition:
    Topic至关于传统消息系统MQ中的一个队列queue,producer端发送的message必须指定是发送到哪一个topic,可是不须要指定topic下的哪一个partition,由于kafka会把收到的message进行load balance,均匀的分布在这个topic下的不一样的partition上( hash(message) % [broker数量]  )。
    在物理结构上,每一个partition对应一个物理的目录(文件夹),文件夹命名是[topicname]_[partition]_[序号],一个topic能够有无数多的partition,根据业务需求和数据量来设置。
    在kafka配置文件中可随时更高num.partitions参数来配置更改topic的partition数量,在建立Topic时经过参数指定parittion数量。Topic建立以后经过Kafka提供的工具也能够修改partiton数量。
     通常来讲,(1)一个Topic的Partition数量大于等于Broker的数量,能够提升吞吐率。(2)同一个Partition的Replica尽可能分散到不一样的机器,高可用。
     当add a new partition的时候,partition里面的message不会从新进行分配,原来的partition里面的message数据不会变,新加的这个partition刚开始是空的,随后进入这个topic的message就会从新参与全部partition的load balance。
     
     - Partition Replica:
       每一个partition能够在其余的kafka broker节点上存副本,以便某个kafka broker节点宕机不会影响这个kafka集群。存replica副本的方式是按照kafka broker的顺序存。例若有5个kafka broker节点,某个topic有3个partition,每一个partition存2个副本,那么partition1存broker1,broker2,partition2存broker2,broker3。。。以此类推(replica副本数目不能大于kafka broker节点的数目,不然报错。这里的replica数其实就是partition的副本总数,其中包括一个leader,其余的就是copy副本)。这样若是某个broker宕机,其实整个kafka内数据依然是完整的。可是,replica副本数越高,系统虽然越稳定,可是回来带资源和性能上的降低;replica副本少的话,也会形成系统丢数据的风险。
       (1)怎样传送消息:producer先把message发送到partition leader,再由leader发送给其余partition follower。
       (2)在向Producer发送ACK前须要保证有多少个Replica已经收到该消息:根据ack配的个数而定。
       (3)怎样处理某个Replica不工做的状况:若是这个部工做的partition replica不在ack列表中,就是producer在发送消息到partition leader上,partition leader向partition follower发送message没有响应而已,这个不会影响整个系统,也不会有什么问题。若是这个不工做的partition replica在ack列表中的话,producer发送的message的时候会等待这个不工做的partition replca写message成功,可是会等到time out,而后返回失败由于某个ack列表中的partition replica没有响应,此时kafka会自动的把这个部工做的partition replica从ack列表中移除,之后的producer发送message的时候就不会有这个ack列表下的这个部工做的partition replica了。
       (4)怎样处理Failed Replica恢复回来的状况:若是这个partition replica以前不在ack列表中,那么启动后从新受Zookeeper管理便可,以后producer发送message的时候,partition leader会继续发送message到这个partition follower上。若是这个partition replica以前在ack列表中,此时重启后,须要把这个partition replica再手动加到ack列表中。(ack列表是手动添加的,出现某个部工做的partition replica的时候自动从ack列表中移除的)

     - Partition leader与follower:
       partition也有leader和follower之分。leader是主partition,producer写kafka的时候先写partition leader,再由partition leader push给其余的partition follower。partition leader与follower的信息受Zookeeper控制,一旦partition leader所在的broker节点宕机,zookeeper会冲其余的broker的partition follower上选择follower变为parition leader。
      
     - Topic分配partition和partition replica的算法:
       (1)将Broker(size=n)和待分配的Partition排序。(2)将第i个Partition分配到第(i%n)个Broker上。(3)将第i个Partition的第j个Replica分配到第((i + j) % n)个Broker上
                       
      - Partition ack:
        当ack=1,表示producer写partition leader成功后,broker就返回成功,不管其余的partition follower是否写成功。当ack=2,表示producer写partition leader和其余一个follower成功的时候,broker就返回成功,不管其余的partition follower是否写成功。当ack=-1[parition的数量]的时候,表示只有producer所有写成功的时候,才算成功,kafka broker才返回成功信息。这里须要注意的是,若是ack=1的时候,一旦有个broker宕机致使partition的follower和leader切换,会致使丢数据。                         
         ![image.png](/img/bVcSl4b)
                            
      - message状态:                                            
        在Kafka中,消息的状态被保存在consumer中,broker不会关心哪一个消息被消费了被谁消费了,只记录一个offset值(指向partition中下一个要被消费的消息位置),这就意味着若是consumer处理很差的话,broker上的一个消息可能会被消费屡次。                                                   
                                                                    
      - message持久化:                                                                       
        Kafka中会把消息持久化到本地文件系统中,而且保持o(1)极高的效率。咱们众所周知IO读取是很是耗资源的性能也是最慢的,这就是为了数据库的瓶颈常常在IO上,须要换SSD硬盘的缘由。可是Kafka做为吞吐量极高的MQ,却能够很是高效的message持久化到文件。这是由于Kafka是顺序写入o(1)的时间复杂度,速度很是快。也是高吞吐量的缘由。因为message的写入持久化是顺序写入的,所以message在被消费的时候也是按顺序被消费的,保证partition的message是顺序消费的。通常的机器,单机每秒100k条数据。                                                                              
                                                                                               
          https://www.cnblogs.com/cxxjohnson/p/8921661.html