干货|kafka流量监控的原理及实现

干货|kafka流量监控的原理及实现

浪院长 浪尖聊大数据 html

工程能力

做为一个优秀的开发人员,项目开发的过程当中监控告警系统的可靠性是能够体现出一我的的工程管理能力的。优秀的监控告警系统能够免去不少精力消耗,好比维护,故障预判,故障及时准确通知,故障定位排查等。数据库

能够想像项目上线后,假如没有监控告警系统,这么一个暗箱是多么可怕。apache

对于大数据项目,数据通常须要先入消息队列,如kafka,而后分离线和实时将数据进行解耦分流,用于实时处理和离线处理。消息队列存在的好处:后端

  • 消息队列的订阅者能够根据须要随时扩展,能够很好的扩展数据的使用者。服务器

  • 消息队列的横向扩展,增长吞吐量,作起来仍是很简单的。这个用传统数据库,分库分表仍是很麻烦的。ide

  • 因为消息队列的存在,也能够帮助咱们抗高峰,避免高峰时期后端处理压力过大致使整个业务处理宕机。
    kafka在大数据项目中做用相当重要,那么对其的监控告警就相当重要了,咱们这里主要是讲针对kafka流量的监控告警,其目的也是很明显的便于咱们了解数据的总体状况及波动状况,以调整处理后端,如spark streaming,flume等。

kafka 监控工具不少,常见的有kafka manager,KafkaOffsetMonitor,kafka eagle,kafka tools等,浪尖最常用的是kafka manager,也建议你们使用该工具,其不只有监控功能还有管理功能。具体使用方法能够参看:工具

kafka管理神器-kafkamanager测试

监控指标

kafka的指标服务器和客户端都有的。具体指标内容,能够参看kafka官网:大数据

http://kafka.apache.org/0102/documentation.html#monitoringurl

查看可用指标的最简单方法是启动jconsole并将其指向正在运行的kafka客户端或服务器; 这将容许使用JMX浏览全部指标。

对于熟悉kafka manager的朋友都应该看过broker相关信息,好比每秒钟的流入的消息条数,每秒钟的流入的消息大小,流出的消息大小等。

使用kafka manager能够很方便的查看。可是,这其实不能让咱们及时的发现数据流量波动,或者说咱们想画个曲线的详细对比历史流量,它是作不到的。因此,咱们要想办法去获取出来这些指标,而后作咱们本身的展现。还有一点就是,流量波动告警。

浪尖这里只作了图中几个指标的接口:

def getBytesInPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
   getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "BytesInPerSec", topicOption)
 }

 def getBytesOutPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
   getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "BytesOutPerSec", topicOption)
 }

 def getBytesRejectedPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
   getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "BytesRejectedPerSec", topicOption)
 }

 def getFailedFetchRequestsPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
   getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "FailedFetchRequestsPerSec", topicOption)
 }

 def getFailedProduceRequestsPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
   getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "FailedProduceRequestsPerSec", topicOption)
 }

 def getMessagesInPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
   getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "MessagesInPerSec", topicOption)
 }

jmx客户端

链接jmx的server是可使用jconsole,可是知足不了咱们的需求。因此,咱们使用JMXConnectorFactory 方式链接jmx。使用JMXConnectorFactory 连接jmx时,JMXServiceURL 的参数 url 必须使用 service:jmx 方式进行链接,具体连接建立方式很简单,几行代码而已,以下:

val jmxHost = "hostname"
val jmxPort = 9999
val urlString = s"service:jmx:rmi:///jndi/rmi://$jmxHost:$jmxPort/jmxrmi"
val url = new JMXServiceURL(urlString)
val jmxc = JMXConnectorFactory.connect(url )

val mbsc = jmxc.getMBeanServerConnection;

println(KafkaMetrics.getMessagesInPerSec(Kafka_0_10_2_1,mbsc,Some("test")).fifteenMinuteRate)
jmxc.close()

开启kafka的jmx端口

kafka的jmx服务默认时关闭的,开启的话很简单,只须要在kafka server的启动脚本kafka-server-start.sh里增长一行代码便可,内容export JMX_PORT="9999",增长位置以下:

if [ "x$KAFKA_HEAP_OPTS" = "x"]; then

  export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

  export JMX_PORT="9999"
    fi

测试

我这里测试就比较简单了,主要是将消息条数打出来,你们能够根据须要自行调整,好比均值大于阈值发短信告警等。

干货|kafka流量监控的原理及实现

本文是本身实现kafka 监控系统的第二篇文章,前面有篇文章讲到了从kafka broker获取消费者已经提交的offset,具体能够阅读:
如何获取kafka的broker保存的消费者信息?
一套完整的kafka监控,包括:

  1. 消费者监控,主要是存活告警,消费滞后告警。
  2. 生产者监控,主要是存活告警,生产者消费上游数据能力告警。
  3. broker监控,主要是存活告警,流量告警,isr列表,topic异常告警,control变换告警。内容颇多,后面陆续出文章实现,固然整个项目最终会放到星球里的。