Kafka 核心技术与实战
胡夕
Apache Kafka Committer,老虎证券技术总监
52815 人已学习
新⼈⾸单¥68
登录后,你可以任选4讲全文学习
课程目录
已完结/共 47 讲
开篇词 (1讲)
结束语 (1讲)
Kafka 核心技术与实战
15
15
1.0x
00:00/00:00
登录|注册

20 | 多线程开发消费者实例

方案2代码示例
方案1代码示例
方案2:消息获取和消息处理分离
方案1:每个线程维护专属的KafkaConsumer实例
心跳线程
用户主线程
开放讨论
小结
实现代码示例
多线程方案
Kafka Java Consumer设计原理
怎么实现Kafka Java Consumer端的多线程消费

该思维导图由 AI 生成,仅供参考

你好,我是胡夕。今天我们来聊聊 Kafka Java Consumer 端多线程消费的实现方案。
目前,计算机的硬件条件已经大大改善,即使是在普通的笔记本电脑上,多核都已经是标配了,更不用说专业的服务器了。如果跑在强劲服务器机器上的应用程序依然是单线程架构,那实在是有点暴殄天物了。不过,Kafka Java Consumer 就是单线程的设计,你是不是感到很惊讶。所以,探究它的多线程消费方案,就显得非常必要了。

Kafka Java Consumer 设计原理

在开始探究之前,我先简单阐述下 Kafka Java Consumer 为什么采用单线程的设计。了解了这一点,对我们后面制定多线程方案大有裨益。
谈到 Java Consumer API,最重要的当属它的入口类 KafkaConsumer 了。我们说 KafkaConsumer 是单线程的设计,严格来说这是不准确的。因为,从 Kafka 0.10.1.0 版本开始,KafkaConsumer 就变为了双线程的设计,即用户主线程和心跳线程
所谓用户主线程,就是你启动 Consumer 应用程序 main 方法的那个线程,而新引入的心跳线程(Heartbeat Thread)只负责定期给对应的 Broker 机器发送心跳请求,以标识消费者应用的存活性(liveness)。引入这个心跳线程还有一个目的,那就是期望它能将心跳频率与主线程调用 KafkaConsumer.poll 方法的频率分开,从而解耦真实的消息处理逻辑与消费者组成员存活性管理。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

本文介绍了Kafka Java Consumer端的多线程消费方案。首先强调了Kafka Java Consumer的单线程架构,并解释了其设计原理。虽然从0.10.1.0版本开始,KafkaConsumer变为了双线程设计,但实际的消息获取逻辑仍然在用户主线程中完成。相比之前的Scala Consumer的多线程架构,新版本的单线程+轮询机制更好地实现了非阻塞式的消息获取,并简化了Consumer端的设计。文章提出了两种多线程方案,分别是消费者程序启动多个线程,每个线程维护专属的KafkaConsumer实例,以及使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。对这两种方案的优缺点进行了详细分析,并给出了实现代码示例。最后,鼓励读者结合实际业务场景,选择适合自己的多线程架构,以实现更宏大的系统。整体而言,本文为读者提供了Kafka Java Consumer端多线程消费的实现方案,使其能够更好地理解Kafka Consumer的设计原理和优势。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Kafka 核心技术与实战》
新⼈⾸单¥68
立即购买
登录 后留言

全部留言(71)

  • 最新
  • 精选
  • 寂静欢喜
    老师 想问下 心跳线程是和主线程分开的,那么 第一种方案中,主线程阻塞,又怎么会导致超时Rebalance呢?

    作者回复: 应该这么说,心跳线程会定期地检查前端线程是否卡住了,一旦发现卡住了,便会发起主动离组。

    2019-11-27
    3
    24
  • 玉剑冰锋
    Kafka重启时间比较长,每次重启一台差不多四五十分钟,日志保存12个小时,每台数据量差不多几个T,想请教一下老师有什么可以优化的参数吗?

    作者回复: 有可能是要加载的日志段数据太多导致的,可以增加num.recovery.threads.per.data.dir的值

    2019-07-18
    4
    18
  • yic
    老师,关于方案2中的做法,位移提交是有重复消费消息和丢失数据的风险的,有没有什么好的实践呀?

    作者回复: 最好的办法就是自己完全实现一套多线程+管理offset的方案,就像Spark Streaming和Flink做的那样。有兴趣的话可以阅读以下Flink中Kafka Connector的源代码:)

    2020-02-12
    2
    9
  • z.l
    请教个问题,如果使用方案1,一个consumer group订阅了2个topic,每个topic都是24个分区,此时最大线程数可以设置为24还是48?

    作者回复: 理论上是48,但实际上这么多线程反而是开销,可以采用多进程的方式

    2019-07-20
    8
  • 飞翔
    老师 想问一个方案1 谁负责分配线程给每个partition呀 我看您的code 只是没产生一个线程去消费一个主题 如果我有4个parition 那么我产生4个线程来消费这个主题,他会自动均匀分配嘛

    作者回复: “谁负责分配线程给每个partition呀” --- leader consumer负责分配。 会均匀分配的,这是kafka consumer代码保证的

    2019-12-04
    7
  • 归零
    看了作者之前写的帖子(https://www.cnblogs.com/huxi2b/p/6124937.html),有个问题请教下: 在多线程场景下,为什么自动提交位移不会丢消息呢? 比如thread1完成了offset1,3,5然后提交。thread2完成2,4失败了。主线程此时怎么提交呢?是上报1还是5? 这其中的原理是什么呢?希望解答下,谢谢!

    作者回复: 看这篇吧:https://www.cnblogs.com/huxi2b/p/13668061.html 算是终极版,修正了前两篇中可能出现的消息丢失场景

    2021-02-01
    2
    5
  • 随心而至
    方案二感觉没什么必要:这个要考虑的东西太多了,纯粹是给自己埋坑 如何保证任务不会被拒绝,底层的线程池中的队列设置多大才好? 如何异步提交位移? 如何保证分区中记录原来的顺序 我觉得分区实际上是并行的单位,对于生成者是这样,消费者也是这样。你想一个Topic快点,多点分区其实就可以了(但也要合理)

    作者回复: 也是一种思路:)

    2021-01-15
    5
  • 张洋
    老师如果当前consumer group下的consumer instance 只分配了当前主题的一个分区是不是意味着 当前也只能是一个线程来消费消息了

    作者回复: 取决于consumer instance是线程还是进程。通常情况下如果consumer instance是进程的话,还是可以使用多个线程来消费这个获取到的数据。

    2020-05-19
    5
  • 高志强
    老师我现在用Php多进程消费,一个topic 130个分区,我是不是该启动130个进程去消费,目前启动64个进程,但消费能力上不去,消息积压量有几十万了,怎么才能提高消费能力呢

    作者回复: 可以考虑单个进程下再开多线程的方式来增强消费能力,不必一味考虑多进程的方案

    2019-12-25
    4
    4
  • Hale
    如果只有一个broker,一个consumer 一个分区,上面的consumer 组成一个组,一个topic 当consumer 卡住时,协调器会将消费者踢出消费组,进行重新分区分配,但只有一个消费者,那消费者就不能接受到数据了,怎样实现消费者重连

    作者回复: 消费者会自动重连的,如果重连失败,说明网络有问题

    2019-12-24
    4
收起评论
显示
设置
留言
71
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部