Kafka集群安装实施_Kafka2.5.0+Redhat7.5+Zookeeper

一、概述

1.1 Kafka概念描述

1)Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。

2)Kafka最初是由LinkedIn公司开发,并于2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。

3)Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。

4)无论是kafka集群,还是consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。

1.2 Kafka集群

Kafka集群由多个Kafka Brokers组成。每个Kafka Broker都有一个唯一的ID(编号)。Kafka Brokers包含主题日志分区,如果希望获得故障处理能力,需要保证至少有三到五个服务器,Kafka集群最大可同时存在10,100或1,000个服务器。

Kafka集群是把状态保存在Zookeeper中的, Zookeeper集群的工作是超过半数才能对外提供服务,最低是三台的配置,3台中超过两台超过半数,允许1台挂掉。

Kafka集群部署也是至少需要3台服务器。此实施文档基于三台服务器来搭建,如图:

二 安装系统准备及规划

资源规划及注意事项

资源项

建议

注释

内存

推荐带有64GB RAM的系统,但是32GB的也可以正常使用。少于32GB往往效果不佳。此外,Kafka非常仔细地使用堆空间,不需要将堆大小设置为超过6GB。这将导致32GB的计算机上的文件系统缓存最多28-30GB。

32GB或以上

CPU

大多数Kafka部署通常对CPU要求不太严格。这样,处理器的设置比其他资源的重要性要小。但是,如果启用了SSL,则CPU要求可能会更高(确切的详细信息取决于CPU类型和JVM实现)。

最好选择具有多个内核的现代处理器。

如果您需要在更快的CPU或更多核心之间进行选择,请选择更多核心。多核提供的额外并发性将远远超过稍快的时钟速度。

多核CPU

存储

建议不要与应用程序日志或其他OS文件系统活动共享用于Kafka数据的相同驱动器,以确保良好的延迟。可以将这些驱动器组合到一个卷(RAID)中,也可以格式化并将每个驱动器安装为自己的目录。由于Kafka具有复制功能,因此RAID提供的冗余也可以在应用程序级别提供。这个选择有几个权衡。

1,如果配置多个数据目录,则代理将在路径中放置一个新分区,该分区中当前存储的分区数最少。每个分区将完全位于数据目录之一中。如果分区之间的数据平衡不佳,则可能导致磁盘之间的负载不平衡。

2,RAID在平衡磁盘之间的负载方面可能会做得更好,因为它可以在较低级别上平衡负载。RAID的主要缺点是减少了可用磁盘空间。RAID的另一个潜在好处是可以容忍磁盘故障。不建议使用RAID5或RAID6,因为会严重影响写入吞吐量,并且在较小程度上会降低磁盘故障时重建阵列的I / O成本(通常,重建成本适用于RAID,但是对于RAID6和RAID 5而言最糟糕)。

3,如果可以接受额外费用,则应使用RAID10。否则,请为您的Kafka服务器配置多个日志目录,每个目录都安装在单独的驱动器上。

4,应该避免使用网络附加存储(NAS)。NAS通常速度较慢,延迟较大,平均延迟偏差较大,并且是单点故障。

5,磁盘吞吐量(IOPS 每秒的读写次数)会影响生产者的性能。因为生产者的消息必须被提交到服务器保存,大多数的客户端都会一直等待,直到至少有一个服务器确认消息已经成功提交为止。也就是说,磁盘写入速度越快,生成消息的延迟就越低。

建议单独的存储,大小根据数据量而定,建议500GB,

网络

快速可靠的网络是分布式系统中必不可少的性能组件。低延迟确保节点可以轻松通信,而高带宽则有助于分片移动和恢复。现代数据中心网络(1 GbE,10 GbE)足以满足绝大多数群集的需求。

当然网络吞吐量决定了Kafka能够处理的最大数据流量。它和磁盘是制约Kafka拓展规模的主要因素。对于生产者、消费者写入数据和读取数据都要瓜分网络流量。同时做集群复制也非常消耗网络。

建议1GB及以上

文件系统

Kafka对文件系统没有特别的要求。常规的XFS, ext4, NTFS都可以运行Kafka。

Ext4 或者NTFS

服务器规划

主机名及IP规划

服务器名

IP

注释

prdserver1

192.168.88.115

Zookeeper server1

Kafka server1

prdserver2

192.168.88.117

Zookeeper server2

Kafka server2

prdserver3

192.168.88.119

Zookeeper server3

Kafka server3

操作系统版本

操作系统没有特别的要求,建议选用Linux较高版本,以减少触发Bug的几率。

操作系统版本

Red Hat Enterprise Linux Server release 7.5 (Maipo)

目录规划

项目

目录

权限

Zookeeper

/opt/zookeeper/

 

kafka

/opt/kafka

 

用户和组

用户

说明

zk

zk

zookeeper安装用户组

kafka

kafka

kafka安装用户组

三 安装

Zookeeper集群的安装

 由于选择的三台服务器作为zookeeper集群,因此接下来的安装步骤需要同时在三台服务器上执行。

JDK安装及基础环境准备

版本说明

从安全角度考虑,建议使用JDK 1.8的最新发行版,因为较早的免费版本已披露了安全漏洞。LinkedIn当前正在使用G1收集器运行JDK 1.8 u5(希望升级到较新版本)。LinkedIn的调整如下所示:

-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC

-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M

-XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80

JDK下载及安装

jdk下载地址:

https://www.oracle.com/java/technologies/javase-downloads.html

#mkdir -p /usr/java

#mv /tmp/jdk-8u241-linux-x64.tar /usr/java

#tar  -zxvf  jdk-8u241-linux-x64.tar

#cd /usr/java/jdk1.8.0_241

#ls -l

total 25988

drwxr-xr-x. 2 10143 10143     4096 Dec 11 18:35 bin

-r--r--r--. 1 10143 10143     3244 Dec 11 18:35 COPYRIGHT

drwxr-xr-x. 3 10143 10143      132 Dec 11 18:35 include

-rw-r--r--. 1 10143 10143  5217333 Dec 11 15:41 javafx-src.zip

drwxr-xr-x. 5 10143 10143      185 Dec 11 18:35 jre

drwxr-xr-x. 5 10143 10143      245 Dec 11 18:35 lib

-r--r--r--. 1 10143 10143       44 Dec 11 18:35 LICENSE

drwxr-xr-x. 4 10143 10143       47 Dec 11 18:35 man

-r--r--r--. 1 10143 10143      159 Dec 11 18:35 README.html

-rw-r--r--. 1 10143 10143      424 Dec 11 18:35 release

-rw-r--r--. 1 10143 10143 21078837 Dec 11 18:35 src.zip

-rw-r--r--. 1 10143 10143   116400 Dec 11 15:41 THIRDPARTYLICENSEREADME-JAVAFX.txt

-r--r--r--. 1 10143 10143   169788 Dec 11 18:35 THIRDPARTYLICENSEREADME.txt

JDK环境变量配置

jdk部署目录

shell

说明

/usr/java/jdk1.8.0_241

vi /etc/profile

末尾添加

JAVA_HOME=/usr/java/jdk1.8.0_241

CLASSPATH=.:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar

PATH=$PATH:${JAVA_HOME}/bin

export JAVA_HOME CLASSPATH PATH

保存退出

source /etc/profile

若客户无特别说明,则jdk部署在/usr/java下(若无目录请创建)。

Jdk配置也可单独部署在zookeeper及kafka实施用户下,修改<user_home>/.bash_profile添加相同内容即可。

查看JDK版本

# java -version

java version "1.8.0_241"

Java(TM) SE Runtime Environment (build 1.8.0_241-b07)

Java HotSpot(TM) 64-Bit Server VM (build 25.241-b07, mixed mode)

添加用户和组

# groupadd zk

# useradd -g zk zk

# passwd zk

文件路径创建

Run as root:

#chmod 777 /opt

#su – zk

#cd /opt

#mkdir zookeeper

下载Zookeeper

下载软件

https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz

上传解压

apache-zookeeper-3.6.1-bin.tar.gz上传到服务器/opt/software

#tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz

#mv apache-zookeeper-3.6.1-bin /opt/zookeeper/zookeeper-3.6.1

修改配置文件

#cd /opt/zookeeper/zookeeper-3.6.1/conf

#ll

total 12

-rw-r--r--. 1 zk zk  535 Apr 21 22:59 configuration.xsl

-rw-r--r--. 1 zk zk 3435 Apr 21 22:59 log4j.properties

-rw-r--r--. 1 zk zk 1148 Apr 21 22:59 zoo_sample.cfg

#zoo_sample.cfg  这个文件是官方给我们的zookeeper的样板文件,复制一份命名为zoo.cfgzoo.cfg是官方指定的文件命名规则。

#cp zoo_sample.cfg zoo.cfg

修改zoo.cfg

# The number of milliseconds of each tick

tickTime=2000

# The number of ticks that the initial

# synchronization phase can take

initLimit=10

# The number of ticks that can pass between

# sending a request and getting an acknowledgement

syncLimit=5

# the directory where the snapshot is stored.

# do not use /tmp for storage, /tmp here is just

# example sakes.

dataDir=/opt/zookeeper/zkdata

# the port at which the clients will connect

clientPort=2181

# the maximum number of client connections.

# increase this if you need to handle more clients

#maxClientCnxns=60

#

# Be sure to read the maintenance section of the

# administrator guide before turning on autopurge.

#

# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance

#

# The number of snapshots to retain in dataDir

#autopurge.snapRetainCount=3

# Purge task interval in hours

# Set to "0" to disable auto purge feature

#autopurge.purgeInterval=1

## Metrics Providers

#

# https://prometheus.io Metrics Exporter

#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider

#metricsProvider.httpPort=7000

#metricsProvider.exportJvmInfo=true

dataLogDir=/opt/zookeeper/zkdatalog

server.1=192.168.88.115:2888:3888

server.2=192.168.88.117:2888:3888

server.3=192.168.88.119:2888:3888

 

#The number of snapshots to retain in dataDir

autopurge.snapRetainCount=20

#Purge task interval in hours

#Set to "0" to disale auto purge feature

autopurge.purgeInterval=48

注释:

#server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里

#192.168.88.115为集群里的IP地址,第一个端口是master和slave之间的通信端口,默认是2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888。

 

创建myid文件

#mkdir -p /opt/zookeeper/zkdata/

#mkdir -p /opt/zookeeper/zkdatalog

创建myid文件, 三台服务器分别创建

#prdserver1

#echo "1" > /opt/zookeeper/zkdata/myid

#prdserver2

#echo "2" > /opt/zookeeper/zkdata/myid

#prdserver3

echo "3" > /opt/zookeeper/zkdata/myid

日志文件配置

#cd /opt/zookeeper/zookeeper-3.6.1

#mkdir logs

#vi /opt/zookeeper/zookeeper-3.6.1/conf/log4j.properties

zookeeper.root.logger=INFO, CONSOLE

zookeeper.console.threshold=INFO

 

zookeeper.log.dir=/opt/zookeeper/zookeeper-3.6.1/logs

zookeeper.log.file=zookeeper.log

zookeeper.log.threshold=INFO

zookeeper.log.maxfilesize=256MB

zookeeper.log.maxbackupindex=20

 

zookeeper.tracelog.dir=${zookeeper.log.dir}

zookeeper.tracelog.file=zookeeper_trace.log

……

启动服务并查看状态

#进入到Zookeeperbin目录下

cd /opt/zookeeper/zookeeper-3.6.1/bin

#启动服务(3台都需要操作)

./zkServer.sh start

----------------------------------------------------------

[[email protected] bin]$ ./zkServer.sh start

/usr/local/java/jdk1.8/bin/java

ZooKeeper JMX enabled by default

Using config: /opt/zookeeper/zookeeper-3.6.1/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

 

[[email protected] bin]$ ./zkServer.sh status

/usr/local/java/jdk1.8/bin/java

ZooKeeper JMX enabled by default

Using config: /opt/zookeeper/zookeeper-3.6.1/bin/../conf/zoo.cfg

Client port found: 2181. Client address: localhost.

Mode: follower

 

[[email protected] bin]$ ./zkServer.sh status

/usr/local/java/jdk1.8/bin/java

ZooKeeper JMX enabled by default

Using config: /opt/zookeeper/zookeeper-3.6.1/bin/../conf/zoo.cfg

Client port found: 2181. Client address: localhost.

Mode: leader

 

[[email protected] bin]$ ./zkServer.sh status

/usr/local/java/jdk1.8/bin/java

ZooKeeper JMX enabled by default

Using config: /opt/zookeeper/zookeeper-3.6.1/bin/../conf/zoo.cfg

Client port found: 2181. Client address: localhost.

Mode: follower

查看zookeeper进程

[[email protected] bin]$ ps -ef |grep zookeeper

zk        2216     1  0 15:39 pts/0    00:00:03 java -Dzookeeper.log.dir=/opt/zookeeper/zookeeper-3.6.1/bin/../logs -Dzookeeper.log.file=zookeeper-zk-server-prdserver1.log -Dzookeeper.root.logger=INFO,CONSOLE -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError=kill -9 %p -cp /opt/zookeeper/zookeeper-3.6.1/bin/../zookeeper-server/target/classes:/opt/zookeeper/zookeeper-3.6.1/bin/../build/classes:/opt/zookeeper/zookeeper-3.6.1/bin/../zookeeper-server/target/lib/*.jar:/opt/zookeeper/zookeeper-3.6.1/bin/../build/lib/*.jar:/opt/zookeeper/zookeeper-3.6.1/bin/../lib/zookeeper-prometheus-metrics-3.6.1.jar:/opt/zookeeper/zookeeper-3.6.1/bin/../lib/zookeeper-jute-3.6.1.jar:/opt/zookeeper/zookeeper-3.6.1/bin/../lib/zookeeper-3.6.1.jar:/opt/zookeeper/zookeeper-3.6.1/bin/../lib/snappy-java-1.1.7.jar:/opt/zookeeper/zookeeper-3.6.1/bin/../lib/slf4j-log4j12-1.7.25.jar:/opt/zookeeper/zookeeper-3.6.1/bin/../lib/slf4j-api-1.7.25.jar:/opt/zookeeper/zookeeper-3.6.1/bin/../lib/simplec

至此zookeeper安装完成。

 

Kafka集群安装

安装环境准备

JDK安装

参见“JDK安装及基础环境准备”。

用户和组

用户

说明

kafka

kafka

域用户组或应用用户组

# groupadd kafka

# useradd -g kafka kafka

# passwd kafka

文件路径

Run as root:

su - kafka

cd /opt

mkdir kafka

mkdir -p /opt/kafka/kafkalogs/

下载Kafka

#下载软件

https://www.apache.org/dyn/closer.cgi?path=/kafka/2.5.0/kafka_2.12-2.5.0.tgz

#上传解压

kafka_2.12-2.5.0.tgz上传到服务器/opt/software

[[email protected] software]$ tar -zxvf kafka_2.12-2.5.0.tgz

[[email protected] software]$mv kafka_2.12-2.5.0 /opt/kafka/kafka2.5.0

修改配置文件

主要关注server.properties 这个文件即可,我们可以发现在config目录下有很多文件,这里可以发现有Zookeeper文件,我们可以根据Kafka内带的Zookeeper集群来启动,但是此文档我们使用第四章独立安装的Zookeeper集群。

参数说明:

broker.id=0  #当前机器在集群中的唯一标识,和zookeepermyid性质一样

port=9092  #当前kafka对外提供服务的端口默认是9092

host.name=192.168.7.115 #这个参数默认是关闭的,在0.8.1有个bugDNS解析问题,失败率的问题。

num.network.threads=3 #这个是borker进行网络处理的线程数

num.io.threads=8 #这个是borker进行I/O处理的线程数

log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个

socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能

socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘

socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小

num.partitions=1 #默认的分区数,一个topic默认1个分区数

log.retention.hours=168 #默认消息的最大持久化时间,168小时,7

message.max.byte=5242880  #消息保存的最大值5M

default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务

replica.fetch.max.bytes=5242880  #取消息的最大直接数

log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件

log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除

log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能

zookeeper.connect=192.168.88.115:2181,192.168.7.117:2181,192.168.7.119:2181 #设置zookeeper的连接端口

 

以节点192.168.88.115为例, 进入到config目录,

cd /opt/kafka/kafka2.5.0/config

vi server.properties

############################# Server Basics #############################

 

# The id of the broker. This must be set to a unique integer for each broker.

broker.id=0 (注:192.168.88.117的设置为1192.168.88.119设置为2)

############################# Socket Server Settings #############################

 

# The address the socket server listens on. It will get the value returned from

# java.net.InetAddress.getCanonicalHostName() if not configured.

#   FORMAT:

#     listeners = listener_name://host_name:port

#   EXAMPLE:

#     listeners = PLAINTEXT://your.host.name:9092

listeners=PLAINTEXT://192.168.88.115:9092 (注:其它两台ip做对应修改)

 

# Hostname and port the broker will advertise to producers and consumers. If not set,

# it uses the value for "listeners" if configured.  Otherwise, it will use the value

# returned from java.net.InetAddress.getCanonicalHostName().

advertised.listeners=PLAINTEXT://192.168.88.115:9092 (注:其它两台ip做对应修改)

 

############################# Log Basics #############################

 

# A comma separated list of directories under which to store log files

#log.dirs=/tmp/kafka-logs

log.dirs=/opt/kafka/kafkalogs/

 

# The default number of log partitions per topic. More partitions allow greater

# parallelism for consumption, but this will also result in more files across

# the brokers.

num.partitions=3

 

############################# Log Retention Policy #############################

 

# The following configurations control the disposal of log segments. The policy can

# be set to delete segments after a period of time, or after a given size has accumulated.

# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens

# from the end of the log.

 

# The minimum age of a log file to be eligible for deletion due to age

log.retention.hours=168

 

message.max.byte=5242880

default.replication.factor=2

replica.fetch.max.bytes=5242880

 

############################# Zookeeper #############################

 

# Zookeeper connection string (see zookeeper docs for details).

# This is a comma separated host:port pairs, each corresponding to a zk

# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".

# You can also append an optional chroot string to the urls to specify the

# root directory for all kafka znodes.

#zookeeper.connect=localhost:2181

zookeeper.connect=192.168.88.115:2181,192.168.7.117:2181,192.168.7.119:2181

 

修改生产者配置文件

vi consumer.properties

# Licensed to the Apache Software Foundation (ASF) under one or more

# contributor license agreements.  See the NOTICE file distributed with

# this work for additional information regarding copyright ownership.

# The ASF licenses this file to You under the Apache License, Version 2.0

# (the "License"); you may not use this file except in compliance with

# the License.  You may obtain a copy of the License at

#

#    http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.

# see org.apache.kafka.clients.consumer.ConsumerConfig for more details

 

# list of brokers used for bootstrapping knowledge about the rest of the cluster

# format: host1:port1,host2:port2 ...

#bootstrap.servers=localhost:9092

bootstrap.servers=192.168.88.115:9092,192.168.88.117:9092,192.168.88.119:9092

# consumer group id

group.id=test-consumer-group

 

# What to do when there is no initial offset in Kafka or if the current

# offset does not exist any more on the server: latest, earliest, none

#auto.offset.reset=

 

修改消费者配置文件

[[email protected] config]$ vi producer.properties

……

############################# Producer Basics #############################

 

# list of brokers used for bootstrapping knowledge about the rest of the cluster

# format: host1:port1,host2:port2 ...

#bootstrap.servers=localhost:9092

bootstrap.servers=192.168.88.115:9092,192.168.88.117:9092,192.168.88.119:9092

……

 

启动Kafka集群并测试

启动Kafka

从后台启动Kafka集群(3台都需要启动)

进入到kafka的bin目录, 运行启动命令。

[[email protected] bin]$cd/opt/kafka/kafka2.5.0/bin

[[email protected] bin]$ ./kafka-server-start.sh -daemon ../config/server.properties

[[email protected] bin]$ ps -ef |grep kafka

kafka     5218     1  8 17:48 pts/1    00:00:02 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/opt/kafka/kafka2.5.0/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/opt/kafka/kafka2.5.0/bin/../logs -Dlog4j.configuration=file:./../config/log4j.properties -cp .:/usr/local/java/jdk1.8/lib:/usr/local/java/jdk1.8/jre/lib:/opt/kafka/kafka2.5.0/bin/../libs/activation-1.1.1.jar:/opt/kafka/kafka2.5.0/bin/../libs/aopalliance-repackaged-2.5.0.jar:/opt/kafka/kafka2.5.0/bin/../libs/argparse4j-0.7.0.jar:/opt/kafka/kafka2.5.0/bin/../libs/audience-annotations-0.5.0.jar:/opt/kafka/kafka2.5.0/bin/../libs/commons-cli-1.4.jar:/opt/kafka/kafka2.5.0/bin/../libs/commons-lang3-3.8.1.jar:/opt/kafka/kafka2.5.0/bin/../libs/connect-api-2.5.0.jar:/opt/kafka/kafka2.5.0/bin/../libs/connect-basic-auth-extension-2.5.0.jar:/opt/kafka/kafka2.5.0/bin/../libs/connect-file-2.5.0.

Kafka日志查看

[[email protected] logs]$ cd /opt/kafka/kafka2.5.0/logs

[[email protected] logs]$ tail -f server.log

[2020-07-29 18:04:33,020] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)

[2020-07-29 18:04:33,027] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)

[2020-07-29 18:04:33,051] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)

[2020-07-29 18:04:33,094] INFO [ExpirationReaper-0-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)

[2020-07-29 18:04:33,151] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)

[2020-07-29 18:04:33,170] INFO [SocketServer brokerId=0] Started data-plane processors for 1 acceptors (kafka.network.SocketServer)

[2020-07-29 18:04:33,175] INFO Kafka version: 2.5.0 (org.apache.kafka.common.utils.AppInfoParser)

[2020-07-29 18:04:33,175] INFO Kafka commitId: 66563e712b0b9f84 (org.apache.kafka.common.utils.AppInfoParser)

[2020-07-29 18:04:33,175] INFO Kafka startTimeMs: 1596017073171 (org.apache.kafka.common.utils.AppInfoParser)

[2020-07-29 18:04:33,177] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

测试Kafka集群消息生产与消费

创建Topic(First_kafka_project)。

[[email protected] bin]$ pwd

/opt/kafka/kafka2.5.0/bin

[[email protected] bin]$ ./kafka-topics.sh --create --zookeeper 192.168.88.115:2181 --replication-factor 2 --partitions 3 --topic First_kafka_project

WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.

Created topic First_kafka_project.

#解释

--replication-factor 2   #复制两份

--partitions 3 #创建3个分区

--topic #主题为First_kafka_project

 

在192.168.88.117上创建一个发布者,并发送一些测试消息。

[[email protected] bin]$ ./kafka-console-producer.sh --broker-list 192.168.88.117:9092 --topic First_kafka_project

>test

>test

>myfirst project

>today is Tuesday!

>end of the message

>

 

在192.168.88.119创建一个订阅者,并查看是否订阅到消息。

[[email protected] bin]$ ./kafka-console-consumer.sh --bootstrap-server 192.168.88.119:9092 --topic First_kafka_project --from-beginning

test

test

myfirst project

today is Tuesday!

today is Tuesday!

end of the message

在192.168.88.119成功接收到消息。说明消息创建和消费测试成功。

 

Kafka集群高可用测试

创建一个topic(Second_kafka_project)

查看具体信息

例如:查看Second_kafka_project topic状态

[[email protected] bin]$ ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic Second_kafka_project

Topic: Second_kafka_project  PartitionCount: 3    ReplicationFactor: 3        Configs:

        Topic: Second_kafka_project  Partition: 0      Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

        Topic: Second_kafka_project  Partition: 1      Leader: 2 Replicas: 2,0,1 Isr: 2,0,1

        Topic: Second_kafka_project  Partition: 2      Leader: 0 Replicas: 0,1,2 Isr: 0,1,2

从上面的信息,我们知道分区Partition: 0leaderbroker.id=1这个节点上,副本在broker.id1 2 0这个三个几点,并且所有副本都存活,并跟broker.id=1这个节点同步。

 

现在killbroker.id=1192.168.88.117)这个节点上的kafka进程。期望的结果是Leardreplicas都不会在broker.id=1这个节点上了。

[[email protected] bin]$ ps -ef |grep kafka

kafka     5136     1  2 18:04 pts/0    00:02:04 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/opt/kafka/kafka2.5.0/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:Numb

[[email protected] bin]$ kill -9 5136

[[email protected] bin]$ ps -ef |grep 5136

kafka     9976  3429  0 19:22 pts/0    00:00:00 grep --color=auto 5136

[[email protected] bin]$

再查看topic状态,节点1已经没有任何leader和副本。

[[email protected] bin]$ ./kafka-topics.sh --describe --zookeeper 192.168.88.117:2181 --topic Second_kafka_project

Topic: Second_kafka_project  PartitionCount: 3    ReplicationFactor: 3        Configs:

        Topic: Second_kafka_project  Partition: 0      Leader: 2 Replicas: 1,2,0 Isr: 2,0

        Topic: Second_kafka_project  Partition: 1      Leader: 2 Replicas: 2,0,1 Isr: 2,0

        Topic: Second_kafka_project  Partition: 2      Leader: 0 Replicas: 0,1,2 Isr: 0,2

 

再测试消息能否正常产生及消费

结果如下。消息能正常产生及消费。此处消费者有一个短暂的切换过程。

[[email protected] bin]$ ./kafka-console-producer.sh --broker-list 192.168.88.117:9092 --topic Second_kafka_project

>test second topic!

>before kill 192.168.88.117....

>after kill 192.168.88.117....

>Test send message again after kill broker1 1 minutes later

>

[[email protected] bin]$ ./kafka-console-consumer.sh --bootstrap-server 192.168.88.119:9092 --topic Second_kafka_project --from-beginning

test second topic!

before kill 192.168.88.117....

[2020-07-29 19:21:21,057] WARN [Consumer clientId=consumer-console-consumer-15949-1, groupId=console-consumer-15949] Connection to node 1 (/192.168.88.117:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

…..

[2020-07-29 19:21:34,428] WARN [Consumer clientId=consumer-console-consumer-15949-1, groupId=console-consumer-15949] Connection to node 1 (/192.168.88.117:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

after kill 192.168.88.117....

Test send message again after kill broker1 1 minutes later

 

总结

集群安装完成后可以根据具体的项目需求做相应参数的优化。