Kafka核心技术与实战
胡夕
人人贷计算平台部总监,Apache Kafka Contributor
立即订阅
8408 人已学习
课程目录
已完结 46 讲
0/4登录后,你可以任选4讲全文学习。
开篇词 (1讲)
开篇词 | 为什么要学习Kafka?
免费
Kafka入门 (5讲)
01 | 消息引擎系统ABC
02 | 一篇文章带你快速搞定Kafka术语
03 | Kafka只是消息引擎系统吗?
04 | 我应该选择哪种Kafka?
05 | 聊聊Kafka的版本号
Kafka的基本使用 (3讲)
06 | Kafka线上集群部署方案怎么做?
07 | 最最最重要的集群参数配置(上)
08 | 最最最重要的集群参数配置(下)
客户端实践及原理剖析 (14讲)
09 | 生产者消息分区机制原理剖析
10 | 生产者压缩算法面面观
11 | 无消息丢失配置怎么实现?
12 | 客户端都有哪些不常见但是很高级的功能?
13 | Java生产者是如何管理TCP连接的?
14 | 幂等生产者和事务生产者是一回事吗?
15 | 消费者组到底是什么?
16 | 揭开神秘的“位移主题”面纱
17 | 消费者组重平衡能避免吗?
18 | Kafka中位移提交那些事儿
19 | CommitFailedException异常怎么处理?
20 | 多线程开发消费者实例
21 | Java 消费者是如何管理TCP连接的?
22 | 消费者组消费进度监控都怎么实现?
深入Kafka内核 (5讲)
23 | Kafka副本机制详解
24 | 请求是怎么被处理的?
25 | 消费者组重平衡全流程解析
26 | 你一定不能错过的Kafka控制器
27 | 关于高水位和Leader Epoch的讨论
管理与监控 (12讲)
28 | 主题管理知多少?
29 | Kafka动态配置了解下?
30 | 怎么重设消费者组位移?
31 | 常见工具脚本大汇总
32 | KafkaAdminClient:Kafka的运维利器
33 | Kafka认证机制用哪家?
34 | 云环境下的授权该怎么做?
35 | 跨集群备份解决方案MirrorMaker
36 | 你应该怎么监控Kafka?
37 | 主流的Kafka监控框架
38 | 调优Kafka,你做到了吗?
39 | 从0搭建基于Kafka的企业级实时日志流处理平台
高级Kafka应用之流处理 (3讲)
40 | Kafka Streams与其他流处理平台的差异在哪里?
41 | Kafka Streams DSL开发实例
42 | Kafka Streams在金融领域的应用
结束语 (1讲)
结束语 | 以梦为马,莫负韶华!
特别放送 (2讲)
加餐 | 搭建开发环境、阅读源码方法、经典学习资料大揭秘
用户故事 | 黄云:行百里者半九十
Kafka核心技术与实战
登录|注册

21 | Java 消费者是如何管理TCP连接的?

胡夕 2019-07-20
你好,我是胡夕。今天我要和你分享的主题是:Kafka 的 Java 消费者是如何管理 TCP 连接的。
在专栏第 13 讲中,我们专门聊过“Java生产者是如何管理 TCP 连接资源的”这个话题,你应该还有印象吧?今天算是它的姊妹篇,我们一起来研究下 Kafka 的 Java消费者管理 TCP 或 Socket 资源的机制。只有完成了今天的讨论,我们才算是对 Kafka 客户端的 TCP 连接管理机制有了全面的了解。
和之前一样,我今天会无差别地混用 TCP 和 Socket 两个术语。毕竟,在 Kafka 的世界中,无论是 ServerSocket,还是 SocketChannel,它们实现的都是 TCP 协议。或者这么说,Kafka 的网络传输是基于 TCP 协议的,而不是基于 UDP 协议,因此,当我今天说到 TCP 连接或 Socket 资源时,我指的是同一个东西。

何时创建 TCP 连接?

我们先从消费者创建 TCP 连接开始讨论。消费者端主要的程序入口是 KafkaConsumer 类。和生产者不同的是,构建 KafkaConsumer 实例时是不会创建任何 TCP 连接的,也就是说,当你执行完 new KafkaConsumer(properties) 语句后,你会发现,没有 Socket 连接被创建出来。这一点和 Java 生产者是有区别的,主要原因就是生产者入口类 KafkaProducer 在构建实例的时候,会在后台默默地启动一个 Sender 线程,这个 Sender 线程负责 Socket 连接的创建。
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《Kafka核心技术与实战》,如需阅读全部文章,
请订阅文章所属专栏。
立即订阅
登录 后留言

精选留言(21)

  • 常超
    整个生命周期里会建立4个连接,进入稳定的消费过程后,同时保持3个连接,以下是详细。
    第一类连接:确定协调者和获取集群元数据。
     一个,初期的时候建立,当第三类连接建立起来之后,这个连接会被关闭。

    第二类连接:连接协调者,令其执行组成员管理操作。
     一个

    第三类连接:执行实际的消息获取。
    两个分别会跟两台broker机器建立一个连接,总共两个TCP连接,同一个broker机器的不同分区可以复用一个socket。
    2019-07-20
    2
    13
  • 信信
    一共建过四次连接。若connection.max.idle.ms 不为-1,最终会断开第一次连的ID为-1的连接。
    2019-07-24
    1
  • Williamzhang
    我觉得作者可以跟学员的留言互动,然后每期课后思考可以在下期中贴出答案及分析,其实留言讨论也是一个非常让人有收获的地方
    2019-07-22
    1
  • rm -rf 😊ི
    我认为也是3个连接,第一个是查找Coordinator的,这个会在后面断开。然后5个partition会分布在2个broker上,那么客户端最多也就连接2次就能消费所有partition了,因此是连接3个,最后保持2个。
    2019-07-21
    1
  • October
    总共创建4个连接,最终保持3个连接:
            确定消费者所属的消费组对应的GroupCoordinator和获取集群的metadata时创建一个TCP连接,由于此时的node id = -1,所以该连接无法重用。
            连接GroupCoordinator时,创建第二个TCP连接,node id值为Integer.MAX_VALUE-id
            消费者会与每个分区的leader创建一个TCP连接来消费数据,node id为broker.id,由于kafka只是用id这一维度来表征Socket连接信息,因此如果多个分区的leader在同一个broker上时,会共用一个TCP连接,由于分区数大于broker的数量,所以会创建两个TCP连接消费数据。
    2019-07-21
    1
  • nightmare
    3个tcp连接 一个查询协调着和获取元数据的tcp连接 一个连接协调写 管理组成员的tcp连接 主题5个分区只有连接leader副本的broker需要创建连接
    2019-07-20
    1
  • 大鸡腿
    胡大佬,问下 "它连接的 Broker 节点的 ID 是 -1,表示消费者根本不知道要连接的 Kafka Broker 的任何信息。" 这边会有真实的broker机器与之对应嘛?

    作者回复: 会有的,只是暂时还不知道ID

    2019-12-09
  • 注定非凡
    1,何时创建
    A :消费者和生产者不同,在创建KafkaConsumer实例时不会创建任何TCP连接。
    原因:是因为生产者入口类KafkaProducer在构建实例时,会在后台启动一个Sender线程,这个线程是负责Socket连接创建的。

    B :TCP连接是在调用KafkaConsumer.poll方法时被创建。在poll方法内部有3个时机创建TCP连接
    (1)发起findCoordinator请求时创建
    Coordinator(协调者)消费者端主键,驻留在Broker端的内存中,负责消费者组的组成员管理和各个消费者的位移提交管理。
    当消费者程序首次启动调用poll方法时,它需要向Kafka集群发送一个名为FindCoordinator的请求,确认哪个Broker是管理它的协调者。

    (2)连接协调者时
    Broker处理了消费者发来的FindCoordinator请求后,返回响应显式的告诉消费者哪个Broker是真正的协调者。
    当消费者知晓真正的协调者后,会创建连向该Broker的socket连接。
    只有成功连入协调者,协调者才能开启正常的组协调操作。

    (3)消费数据时
    消费者会为每个要消费的分区创建与该分区领导者副本所在的Broker连接的TCP.

    2 创建多少
    消费者程序会创建3类TCP连接:
    (1) :确定协调者和获取集群元数据
    (2):连接协调者,令其执行组成员管理操作
    (3) :执行实际的消息获取

    3 何时关闭TCP连接
    A :和生产者相似,消费者关闭Socket也分为主动关闭和Kafka自动关闭。
    B :主动关闭指通过KafkaConsumer.close()方法,或者执行kill命令,显示地调用消费者API的方法去关闭消费者。
    C :自动关闭指消费者端参数connection.max.idle.ms控制的,默认为9分钟,即如果某个socket连接上连续9分钟都没有任何请求通过,那么消费者会强行杀死这个连接。
    D :若消费者程序中使用了循环的方式来调用poll方法消息消息,以上的请求都会被定期的发送到Broker,所以这些socket连接上总是能保证有请求在发送,从而实现“长连接”的效果。
    E :当第三类TCP连接成功创建后,消费者程序就会废弃第一类TCP连接,之后在定期请求元数据时,会改为使用第三类TCP连接。对于一个运行了一段时间的消费者程序来讲,只会有后面两种的TCP连接。
    2019-11-06
  • 举个荔枝
    老师,想问下这里是不是笔误。
    还记得消费者端有个组件叫Coordinator吗?协调者应该是位于Broker端的吧?

    作者回复: 嗯嗯,其实这里的消费者端指的是广义的消费者,我是想说在Kafka消费者的概念中有Coordinator。当然如你所说Coordinator是Broker端的组件没错。这里的确有不严谨的地方,多谢指出:)

    2019-10-02
  • 天天向上
    元数据不包含协调者信息吗?为啥还要再请求一次协调者信息 什么设计思路?

    作者回复: 不包括,因为你请求元数据的broker可能不是Coordinator,没有Coordinator的信息

    2019-09-19
  • 兔2🐰🍃
    有2个 Broker,5个分区的领导者副本,由zookeeper分配Leader,所以默认是均匀的,第三类会创建2个TCP连接,故共有4个TCP连接。
    请问胡老师 Leader副本 分配策略是什么?

    作者回复: 如果是不考虑机架信息,你基本上可以认为是round robin策略

    2019-09-09
  • Mark
    我们要设计一个消息系统。有两个选择,更好的一种是每种不同schema的消息发一个topic。但是有一种担心是consumer会为每个topic建立一个连接,造成连接数太多。请问胡老师,kafka client的consumer是每个集群固定数目的tcp连接,还是和topic数目相关?

    作者回复: 和它要订阅的topic分区数以及这些分区在broker上的散列情况有关。比如你订阅了100个分区,但这个100个分区的leader副本都在一个broker上,那么长期来看consumer也就只和这1个broker建立连接;相反如果这100分区散列在100个broker上,那么长期来看consumer会和100个broker维持长连接

    2019-08-29
    1
  • 落霞与孤鹜
    最终会保留三个吧,协调者链接需要与其他链接特地分开。
    2019-08-02
  • Hello world
    老师,如果是调用consumer.createMessageStreams()这个方法,那这样也是建立min(broker数,分区leader数)+1个tcp连接吗?还有建立一个连接会启动一个线程的吧,我看了我的java进程下有几十个线程,但是好像也远远大于broker的数量。
    2019-07-23
  • taj3991
    老师同一个消费组的客户端都只会连接到一个协调者吗?

    作者回复: 是的。每个group都有一个与之对应的coordinator

    2019-07-22
  • 吴宇晨
    一个获取元数据的连接(之后会断开)+两个连接分区leader的连接+一个连接协调者的连接
    2019-07-22
  • 30斤的大番薯
    老师您好。我之前搭建了一个单机kafka,能正常收发消息。最近我按网上的介绍,把kafka改成集群,出现一个问题:使用kafka自带的命令行生产者工具可以成功发送消息。但是用命令行消费者工具总是接收不到数据(启动消费者一直不输出数据,仿佛没收到消息一样)。我用strace跟踪发现消费者进程一在循环进行epoll(超时)调用,kafka服务器日志无异常。请问这种情况要怎么检查问题。
    2019-07-22
    1
  • 小木匠
    “负载是如何评估的呢?其实很简单,就是看消费者连接的所有 Broker 中,谁的待发送请求最少。”
    老师这个没太明白,这时候消费者不是还没连接么?那这部分信息是从哪获取到的呢?消费者本地吗?

    作者回复: 刚开始的时候当然就类似于随机选broker了,但后面慢慢积累了一些数据之后这个小优化还是会起一些作用的

    2019-07-22
  • 杨陆伟
    我觉得是3个TCP连接,查找协调者1个连接,连接两个Broker2个连接,且查找协调者的连接慢慢会关闭
    2019-07-20
    1
  • 明翼
    连接有三个阶段:首先获取协调者连接同时也获取元数据信息,这个连接后面会关闭;连接协调者执行,等待分配分区,组协调等,这需要一个连接;后面真正消费五个分区两个broker最多就两个连接,分区大于broker所以一定是两个,因为第一类连接没有id,所以无法重用,会在第三类开启连接后关闭,所以开始四个连接最终保持三个连接
    2019-07-20
收起评论
21
返回
顶部