12 | 客户端都有哪些不常见但是很高级的功能?
该思维导图由 AI 生成,仅供参考
什么是拦截器?
- 深入了解
- 翻译
- 解释
- 总结
Kafka拦截器是一种高级功能,允许在消息处理的前后动态插入特定逻辑。这种设计思路借鉴了Spring Interceptor和Apache Flume的概念。Kafka拦截器分为生产者和消费者拦截器,支持在消息发送前、发送后、消费前和提交位移后插入特定逻辑。这些拦截器可以通过参数配置指定,并且支持链式调用。典型使用场景包括客户端监控、端到端系统性能检测和消息审计。通过拦截器,可以快速观测、验证和监控集群间的客户端性能指标,以及实现消息审计功能。文章还分享了一个具体案例,通过编写拦截器类来统计消息端到端处理的延时,实现了端到端的指标监控,能够从全局角度俯察和审视业务运行情况,及时查看业务是否满足端到端的SLA目标。总的来说,Kafka拦截器虽然是冷门功能,但实际上具有重要意义,可以满足实际的需求,为企业级基础架构的公司提供了有力支持。
《Kafka 核心技术与实战》,新⼈⾸单¥68
全部留言(51)
- 最新
- 精选
- 风中花胡老师您好! 我公司现在就我一个人懂点kafka,但是公司线下却有使用kafka,现在知道我学习这个就交给我了,我现在遇到一个线上问题:消息经常堆积起来,不能消费了,重启服务就能继续消费了。我目前得能力还搞不定,望老师能给指点一二 。谢谢。谢谢
作者回复: 消息堆积可能原因如下:1. 生产速度大于消费速度,这样可以适当增加分区,增加consumer数量,提升消费TPS;2. consumer消费性能低,查一下是否有很重的消费逻辑(比如拿到消息后写HDFS或HBASE这种逻辑就挺重的),看看是否可以优化consumer TPS;3. 确保consumer端没有因为异常而导致消费hang住; 4. 如果你使用的是消费者组,确保没有频繁地发生rebalance 主要排查下可能是哪些原因
2019-07-041987 - 落霞与孤鹜这个问题和这期的内容没关系😓。 如果一个主题,由一个应用的名为A的消费组消费,然后把消费组名改为B,重新发布应用,这个时候是不是从主题的分区头开始消费?如何保证从上次A消费组的最新偏移量处开始消费?
作者回复: 我假设你指的名字是group.id。那么把A改成B对于Kafka而言就是新的consumer。新consumer从头还是从最新开始消费取决于auto.offset.reset的设置
2019-06-30320 - 李先生胡哥: consumer消费:比如异步发积分,发积分的消息进入kafka,加积分服务监听kafka的topic,为了避免重复消费,加积分服务获取到消息后先写入mysql,并利用mysql的唯一索引的能力来避免重复消费,然后加积分服务异步去执行mysql中的信息去实现加积分。这种实现方案会导致消费性能低下,但是写入mysql一是避免重复消费,二是做一条成功的记录(便于后期查询)。这种如何优化呢
作者回复: 如果只是这样使用,我倒是不建议用MySQL来做去重,你还不如在应用层面自行去重。一定要用的话,不妨试试把MySQL表为topic key做分区表吧
2020-04-1012 - Lei@时速云👍 胡总出专栏了
作者回复: 磊总别闹:)
2019-06-298 - 打码的土豆老师你好 最近在看kafka官方文档时有一处没看懂还望老师指教 All data is immediately written to a persistent log on the filesystem without necessarily flushing to disk. In effect this just means that it is transferred into the kernel's pagecache 这上面说的文件系统上的持久日志为什么会是pagecache pagecache不是内存的一部分吗
作者回复: 它这句话的意思是不需要用户手动调用flush来刷盘,由os自己来做:)
2019-07-0127 - 周曙光爱学习拦截器的确是很有用,我们在grpc的拦截器中做限流处理。同理,由于下游存储TPS能力有限,也完全可以在kafka消费的拦截器中做消费限流处理,防止把存储打挂
作者回复: 嗯嗯,算是一个不太常见功能的典型应用了:)
2020-04-096 - IT小僧老师好!这个消费者段onConsume是在消费数据之前执行的方法,并不能真正统计实际处理该消费耗费时间的吧。我觉得理应放在onCommit里面。
作者回复: 是的,你是对的~ 其实放在哪个方法里面取决于你的计时逻辑:)
2020-01-136 - 小马老师有两个疑问请教下: 1、时间的一致性问题。System.currentTimeMillis() - record.timestamp()发送和接收的时间实际上可能来自两台机器,有可能时间不一致,会导致统计结果偏差很大; 2、消费端代码计算时间差是在循环里面进行的,把System.currentTimeMillis()提到循环外面应该会好一点吧,毕竟这一批消息应该算是同时接收的; 3、消息总数是在生产端统计的,时延是在消费端统计的,但是如果在消息传输过程中出现部分消息丢失是不是会影响统计的准确性。
作者回复: 1. 确实有这个问题。用户自己来规避之 2. 同意~ 3. 我们还是能保证消息不丢失的吧:)
2020-01-0946 - 钱胡老师好,请教两个小问题 1:broker通过网络拿到消息后,落盘的规则是什么?来一条消息就落盘一条?还是积攒够一定的量再落盘? 2:从追主,新的消息是主主动推给各个从的?还是从主动拉取的?如果是主动拉取,从怎么知道主有新的消息的?另外,同步的时候是只要有一条新消息就同步嘛? 如果其他同学清楚,也请帮忙解答一下,多谢。
作者回复: 1. 实际上这个交由OS来决定,Kafka不管 2. 从拉取的方式。Kafka定义了水位的概念来标识拉取的进度
2019-08-1436 - 姬广龙Interceptor处理数据是单条的吗,还是多条数据作为一个集合
作者回复: 单条消息
2019-11-234