21 | Kafka Consumer源码分析:消息消费的实现过程
该思维导图由 AI 生成,仅供参考
- 深入了解
- 翻译
- 解释
- 总结
Kafka消费者源码分析文章深入剖析了Kafka消费模型的实现细节。首先介绍了消费模型的要点,包括ConsumerGroup、Partition、Coordinator等概念,以及消费过程中的rebalance和心跳机制。随后通过代码示例展示了KafkaConsumer的使用方法,包括配置信息设置、实例创建、订阅Topic和拉取消息的流程。重点讨论了订阅过程的实现、Consumer与Coordinator的协商、以及拉取消息的实现。文章还分析了Kafka在并发检测方面的做法和元数据更新的时机。整体而言,本文通过源码分析深入剖析了Kafka消费者的实现细节,对于想深入了解Kafka消费模型和源码实现的读者具有一定的参考价值。文章还强调了Kafka消费者采用完全异步的设计思想,通过构建Request对象、暂存入发送队列、择机发送请求和处理响应的方式实现高吞吐量。同时,也指出了这种设计的复杂性和维护成本。最后,提出了思考题,引发读者对Kafka Consumer消费位置维护和提交的思考。
《消息队列高手课》,新⼈⾸单¥59
全部留言(29)
- 最新
- 精选
- Peter课后作业,希望老师指正。 在基础篇03的时候讲过消费位置是消息队列服务器针对每个消费组和每个队列维护的一个位置变量。那么也就是说最终真正更新这个位置变量应该是交由服务器去执行的,而Consumer只是发送一个请求。那么顺着这个思路,我猜应该是在更新元数据的时候就应该发送这个请求,原因很简单:消费者需要知道“从哪发起”并且“发多少”,因此这时就已经知道了应该将消费位置更新为多少了,所以这时候就可以发送这个请求了。至于服务器最终会将消费位置更新为多少,还取决于客户端返回的结果。 在方法updateAssignmentMetadataIfNeeded中,最后一行return updateFetchPositions(timer); 从updateFetchPositions这个方法点进去,看到coordinator.refreshCommittedOffsetsIfNeeded(timer) 这个方法点进去之后会看到fetchCommittedOffsets方法,进这个方法,找到sendOffsetFetchRequest,点进去,最终会发现 client.send(coordinator, requestBuilder)
作者回复: 细致👍
2019-11-01222 - 鲁班大师老师,kafak consumer 在reblance期间,如何实现不重复消费
作者回复: 实现“不重复消费”是非常困难的,你需要做的就是让你的consumer具备幂等性,这样即使发生重复消费,也不会对系统数据产生任何影响。
2020-06-0136 - 山头老师,你好,broker和消费端都重启了,消费端还知道从哪个offset开始消费吗
作者回复: 从服务端的协调者获取。服务端的协调者会记录主题的每个消费组的每个分区当前的消费位置
2019-09-3046 - 凌空飞起的剪刀腿老师您好: kafka consumer中没有分析到心跳线程是怎么处理的,我看源代码上写的是单独开了一个后台线程负责心跳,这样处理的优势是什么啊?
作者回复: Kafka Consumer与服务端的协调者维护心跳,而协调者所在的Broker不一定和接收消息的Broker是同一个实例。 所以,必须得分开。
2020-04-133 - 鲁班大师每个 ConsumerGroup 都有一个 Coordinator(协调者)负责分配 Consumer 和 Partition 的对应关系,当 Partition 或是 Consumer 发生变更时,会触发 rebalance(重新分配)过程,重新分配 Consumer 与 Partition 的对应关系;……在rebalance期间应该是不能消费的吧
作者回复: 是的,消费会暂停,直到Rebalance完成。
2020-05-212 - SKang老师 我看完之后 可以理解为 消费组A 消费一个cc主题的消息,然后过程中我将消费组A 的名字改成消费组B后,不会出现重复消费,只会接着A的 继续消费剩下的吧 我认为毕竟A已经成功消费了 偏移量已经成功被更新了吧
作者回复: 不是的,不同消费组的偏移量是分开记录的。
2020-03-191 - 山头消费者如何从服务端拉取消息的,用for循环效率太低吧,能否说说实际的代码
作者回复: 同学,我们这节课通篇就是讲得这个问题啊,给你们讲解使用的就是实际的代码。
2019-09-1351 - 蛤蟆先生有个问题请教一下老师,目前我们公司某个应用在生产环境一共有两台机器,这时候有一台机器挂了,但是某个消息还是会经常消费到这台挂的机器上,导致消息没有消费成功,这是为什么呢?
作者回复: 可以贴一下Topic的具体配置,大家一起帮你分析一下原因。
2020-06-15 - 鲁班大师多个consumer消费同一个partition会有什么问题么
作者回复: 如果是不同的group.id,互相之间没有影响。
2020-05-213 - lizhibo老师好,kafka消息要是在消费端消费出现异常了怎么办,他没有再次消费的机制,比如1分之后再去消费,这个怎么实现
作者回复: 所以消费的时候,如果不能接受丢消息,一定不要设置成自动提交消费位置。这样下次拉取的时候,还会拉到这个位置的消息。 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L)); // 执行消费业务逻辑,然后再提交消费位置。 consumer.commitSync();
2020-05-134