20 | 多线程开发消费者实例
该思维导图由 AI 生成,仅供参考
Kafka Java Consumer 设计原理
- 深入了解
- 翻译
- 解释
- 总结
本文介绍了Kafka Java Consumer端的多线程消费方案。首先强调了Kafka Java Consumer的单线程架构,并解释了其设计原理。虽然从0.10.1.0版本开始,KafkaConsumer变为了双线程设计,但实际的消息获取逻辑仍然在用户主线程中完成。相比之前的Scala Consumer的多线程架构,新版本的单线程+轮询机制更好地实现了非阻塞式的消息获取,并简化了Consumer端的设计。文章提出了两种多线程方案,分别是消费者程序启动多个线程,每个线程维护专属的KafkaConsumer实例,以及使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。对这两种方案的优缺点进行了详细分析,并给出了实现代码示例。最后,鼓励读者结合实际业务场景,选择适合自己的多线程架构,以实现更宏大的系统。整体而言,本文为读者提供了Kafka Java Consumer端多线程消费的实现方案,使其能够更好地理解Kafka Consumer的设计原理和优势。
《Kafka 核心技术与实战》,新⼈⾸单¥68
全部留言(71)
- 最新
- 精选
- 寂静欢喜老师 想问下 心跳线程是和主线程分开的,那么 第一种方案中,主线程阻塞,又怎么会导致超时Rebalance呢?
作者回复: 应该这么说,心跳线程会定期地检查前端线程是否卡住了,一旦发现卡住了,便会发起主动离组。
2019-11-27324 - 玉剑冰锋Kafka重启时间比较长,每次重启一台差不多四五十分钟,日志保存12个小时,每台数据量差不多几个T,想请教一下老师有什么可以优化的参数吗?
作者回复: 有可能是要加载的日志段数据太多导致的,可以增加num.recovery.threads.per.data.dir的值
2019-07-18418 - yic老师,关于方案2中的做法,位移提交是有重复消费消息和丢失数据的风险的,有没有什么好的实践呀?
作者回复: 最好的办法就是自己完全实现一套多线程+管理offset的方案,就像Spark Streaming和Flink做的那样。有兴趣的话可以阅读以下Flink中Kafka Connector的源代码:)
2020-02-1229 - z.l请教个问题,如果使用方案1,一个consumer group订阅了2个topic,每个topic都是24个分区,此时最大线程数可以设置为24还是48?
作者回复: 理论上是48,但实际上这么多线程反而是开销,可以采用多进程的方式
2019-07-208 - 飞翔老师 想问一个方案1 谁负责分配线程给每个partition呀 我看您的code 只是没产生一个线程去消费一个主题 如果我有4个parition 那么我产生4个线程来消费这个主题,他会自动均匀分配嘛
作者回复: “谁负责分配线程给每个partition呀” --- leader consumer负责分配。 会均匀分配的,这是kafka consumer代码保证的
2019-12-047 - 归零看了作者之前写的帖子(https://www.cnblogs.com/huxi2b/p/6124937.html),有个问题请教下: 在多线程场景下,为什么自动提交位移不会丢消息呢? 比如thread1完成了offset1,3,5然后提交。thread2完成2,4失败了。主线程此时怎么提交呢?是上报1还是5? 这其中的原理是什么呢?希望解答下,谢谢!
作者回复: 看这篇吧:https://www.cnblogs.com/huxi2b/p/13668061.html 算是终极版,修正了前两篇中可能出现的消息丢失场景
2021-02-0125 - 随心而至方案二感觉没什么必要:这个要考虑的东西太多了,纯粹是给自己埋坑 如何保证任务不会被拒绝,底层的线程池中的队列设置多大才好? 如何异步提交位移? 如何保证分区中记录原来的顺序 我觉得分区实际上是并行的单位,对于生成者是这样,消费者也是这样。你想一个Topic快点,多点分区其实就可以了(但也要合理)
作者回复: 也是一种思路:)
2021-01-155 - 张洋老师如果当前consumer group下的consumer instance 只分配了当前主题的一个分区是不是意味着 当前也只能是一个线程来消费消息了
作者回复: 取决于consumer instance是线程还是进程。通常情况下如果consumer instance是进程的话,还是可以使用多个线程来消费这个获取到的数据。
2020-05-195 - 高志强老师我现在用Php多进程消费,一个topic 130个分区,我是不是该启动130个进程去消费,目前启动64个进程,但消费能力上不去,消息积压量有几十万了,怎么才能提高消费能力呢
作者回复: 可以考虑单个进程下再开多线程的方式来增强消费能力,不必一味考虑多进程的方案
2019-12-2544 - Hale如果只有一个broker,一个consumer 一个分区,上面的consumer 组成一个组,一个topic 当consumer 卡住时,协调器会将消费者踢出消费组,进行重新分区分配,但只有一个消费者,那消费者就不能接受到数据了,怎样实现消费者重连
作者回复: 消费者会自动重连的,如果重连失败,说明网络有问题
2019-12-244