1 为什么要监控
A :对于Kafka消费者,最重要的事情就是监控它们的消费进度(消费的滞后程度)常称为:Consumer Lag
B :Lag的单位是消息数,他直接反映了一个消费者的运行情况。一个正常的消费者的Lag应当很小,设置为0。这表明消费者能够及时地消费生产者生产出来的消息。反之,一个消费者Lag值很大的话表明它无法跟上生产者的速度。
C :如果消费者速度无法匹及生产者的数据,极有可能导致它消费的数据已经不在操作系统的页缓存中了,那些数据就失去了享有Zero Copy技术的条件,不得不从磁盘中读取,进一步拉大了与生产者的差距。并且会越来大。
所以:在实际业务场景中必须时刻关注消费者的消费进度。一旦出现Lag逐步增加的趋势,就要立即定位问题,及时处理,避免问题扩散。
2 如何监控
A :使用Kafka自带的命令行工具kafka-consumer-groups脚本
B :使用Kafka Java Conssumer API编程
C :使用Kafka自带的JMX监控指标
3 方法具体分析
A :Kafka自带命令
(1) kafka-consumer-groups脚本是kafka为我们提供的最直接的监控消费者消费进度工具。
(2) 使用:
$ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息 > --describe --group <group 名称 >
<Kafka broker 连接信息 >:主机:端口
<group 名称 > :要监控的消费组的 group.id值
(3)展示的信息:主题,分区,该消费者组最新消费消息的位移值(CURRENT-OFFSET值),每个分区当前最新生产的消息的位移值(LOG-END-OFFSET),LAG(前两者的差值),消费者实例ID,消费者连接Broker的主机名以及消费者的CLENT-ID信息。
B :Kafka Java Consumer API
(1)首先获取给定的消费者组的最新消费消息的位移
(2)在获取订阅分区的最新消息位移
(3)最后执行相应的减法操作,获取Lag值并封装进一个Map对象。
C :Kafka JMX监控指标
使用Kafka默认提供 的JMX监控指标来监控消费者的Lag值。
(1)Kafka消费者提供了一个名为Kafka.consumer:type=consumer-fetch-manager-metrics,client-id=”{client-id}”的JMX指标。
(2)有两个重要的属性:records-lag-max 和 records-lead-min 分别表示消费者在测试窗口时间内曾经达到的最大的Lag值和最小的Lead值。
(3)Lead值是指消费者最新消费消息的位移和分区当前第一条消息的位移的差值。即:Lag越大,Lead就越小。
展开