Kafka核心技术与实战
胡夕
人人贷计算平台部总监,Apache Kafka Contributor
立即订阅
8408 人已学习
课程目录
已完结 46 讲
0/4登录后,你可以任选4讲全文学习。
开篇词 (1讲)
开篇词 | 为什么要学习Kafka?
免费
Kafka入门 (5讲)
01 | 消息引擎系统ABC
02 | 一篇文章带你快速搞定Kafka术语
03 | Kafka只是消息引擎系统吗?
04 | 我应该选择哪种Kafka?
05 | 聊聊Kafka的版本号
Kafka的基本使用 (3讲)
06 | Kafka线上集群部署方案怎么做?
07 | 最最最重要的集群参数配置(上)
08 | 最最最重要的集群参数配置(下)
客户端实践及原理剖析 (14讲)
09 | 生产者消息分区机制原理剖析
10 | 生产者压缩算法面面观
11 | 无消息丢失配置怎么实现?
12 | 客户端都有哪些不常见但是很高级的功能?
13 | Java生产者是如何管理TCP连接的?
14 | 幂等生产者和事务生产者是一回事吗?
15 | 消费者组到底是什么?
16 | 揭开神秘的“位移主题”面纱
17 | 消费者组重平衡能避免吗?
18 | Kafka中位移提交那些事儿
19 | CommitFailedException异常怎么处理?
20 | 多线程开发消费者实例
21 | Java 消费者是如何管理TCP连接的?
22 | 消费者组消费进度监控都怎么实现?
深入Kafka内核 (5讲)
23 | Kafka副本机制详解
24 | 请求是怎么被处理的?
25 | 消费者组重平衡全流程解析
26 | 你一定不能错过的Kafka控制器
27 | 关于高水位和Leader Epoch的讨论
管理与监控 (12讲)
28 | 主题管理知多少?
29 | Kafka动态配置了解下?
30 | 怎么重设消费者组位移?
31 | 常见工具脚本大汇总
32 | KafkaAdminClient:Kafka的运维利器
33 | Kafka认证机制用哪家?
34 | 云环境下的授权该怎么做?
35 | 跨集群备份解决方案MirrorMaker
36 | 你应该怎么监控Kafka?
37 | 主流的Kafka监控框架
38 | 调优Kafka,你做到了吗?
39 | 从0搭建基于Kafka的企业级实时日志流处理平台
高级Kafka应用之流处理 (3讲)
40 | Kafka Streams与其他流处理平台的差异在哪里?
41 | Kafka Streams DSL开发实例
42 | Kafka Streams在金融领域的应用
结束语 (1讲)
结束语 | 以梦为马,莫负韶华!
特别放送 (2讲)
加餐 | 搭建开发环境、阅读源码方法、经典学习资料大揭秘
用户故事 | 黄云:行百里者半九十
Kafka核心技术与实战
登录|注册

13 | Java生产者是如何管理TCP连接的?

胡夕 2019-07-02
你好,我是胡夕。今天我要和你分享的主题是:Kafka 的 Java 生产者是如何管理 TCP 连接的。

为何采用 TCP?

Apache Kafka 的所有通信都是基于 TCP 的,而不是基于 HTTP 或其他协议。无论是生产者、消费者,还是 Broker 之间的通信都是如此。你可能会问,为什么 Kafka 不使用 HTTP 作为底层的通信协议呢?其实这里面的原因有很多,但最主要的原因在于 TCP 和 HTTP 之间的区别。
从社区的角度来看,在开发客户端时,人们能够利用 TCP 本身提供的一些高级功能,比如多路复用请求以及同时轮询多个连接的能力。
所谓的多路复用请求,即 multiplexing request,是指将两个或多个数据流合并到底层单一物理连接中的过程。TCP 的多路复用请求会在一条物理连接上创建若干个虚拟连接,每个虚拟连接负责流转各自对应的数据流。其实严格来说,TCP 并不能多路复用,它只是提供可靠的消息交付语义保证,比如自动重传丢失的报文。
更严谨地说,作为一个基于报文的协议,TCP 能够被用于多路复用连接场景的前提是,上层的应用协议(比如 HTTP)允许发送多条消息。不过,我们今天并不是要详细讨论 TCP 原理,因此你只需要知道这是社区采用 TCP 的理由之一就行了。
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《Kafka核心技术与实战》,如需阅读全部文章,
请订阅文章所属专栏。
立即订阅
登录 后留言

精选留言(45)

  • 柠檬C
    应该可以用懒加载的方式,实际发送时再进行TCP连接吧,虽然这样第一次发送时因为握手的原因会稍慢一点
    2019-07-02
    1
    7
  • kursk.ye
    试想一下,在一个有着 1000 台 Broker 的集群中,你的 Producer 可能只会与其中的 3~5 台 Broker 长期通信,但是 Producer 启动后依次创建与这 1000 台 Broker 的 TCP 连接。一段时间之后,大约有 995 个 TCP 连接又被强制关闭。这难道不是一种资源浪费吗?很显然,这里是有改善和优化的空间的。

    这段不敢苟同。作为消息服务器中国,连接应该是种必要资源,所以部署时就该充分给予,而且创建连接会消耗CPU,用到时再创建不合适,我甚至觉得Kafka应该有连接池的设计。

    另外最后一部分关于TCP关闭第二种情况,客户端到服务端没有关闭,只是服务端到客户端关闭了,tcp是四次断开,可以单方向关闭,另一方向继续保持连接

    作者回复: 嗯嗯,欢迎不同意见。Kafka对于创建连接没有做任何限制。如果一开始就创建所有TCP连接,之后因为超时的缘故又关闭这些连接,当真正使用时再次创建,那么为什么不把创建时机后延到真正需要的时候呢?实际场景中将TCP连接设置为长连接的情形并不多见,因此我说这种设计是可以改进的。

    2019-07-02
    3
  • 开水
    觉得创建kafkaProducer的时候可以不用去创建sender线程去连接broker。
    1. 第一次更新元数据的时候,配置一个并发连接参数,比如说10,按照该连接参数的余数去和配置中broker建立TCP连接。
    2. 获取到相应的metadata信息后,再去和相应的broker进行连接,连接建立后关闭掉无用的连接。
    3. 按照原有设计,发送数据时再次检查连接。

    这样多余连接不会超过10,并且可配置。而且在更新metadata和发送数据时进行了连接的双重监测,不用进行三次监测。
    2019-07-02
    2
    3
  • 诗泽
    看来无论在bootstrap.servers中是否写全部broker 的地址接下来producer 还是会跟所有的broker 建立一次连接😂
    2019-07-02
    3
  • KEEPUP
    KafkaProducer 实例只是在首次更新元数据信息之后,创建与集群中所有 Broker 的 TCP 连接,还是每次更新之后都要创建?为什么要创建与所有 Broker 的连接呢?
    2019-07-02
    1
    2
  • 明翼
    我的想法这样的,先用客户端去连配置的第一broker server,连不上就连接第二个,一旦连上了,就可以获取元数据信息了,这是创建produce实例时候动作,只发起一个TCP连接。再send时候发现没连接的再连接,至于其他的都还是很合理的。
    2019-07-02
    2
  • 旭杰
    Producer 通过 metadata.max.age.ms定期更新元数据,在连接多个broker的情况下,producer是如何决定向哪个broker发起该请求?

    作者回复: 向它认为当前负载最少的节点发送请求,所谓负载最少就是指未完成请求数最少的broker

    2019-07-23
    1
  • JoeyLi666
    老师,最近使用kakfa,报了个异常:
    Caused by: org.apache.kafka.common.KafkaException: Record batch for partition Notify-18 at offset 1803009 is invalid, cause: Record is corrupt (stored crc = 3092077514, computed crc = 2775748463)
    kafka的数据还会损坏,不是有校验吗?

    作者回复: 也可能是网络传输过程中出现的偶发情况,通常没有什么好的解决办法。。。

    2019-07-04
    1
    1
  • 南辕北辙
    老师您好,我在本地分别用1.x版本和2.x版本的生产者去测试,为什么结果和老师的不一样呢。初始化KafkaProducer时,并没有与参数中设置的所有broker去建立连接,然后我sleep十秒,让sender线程有机会多运行会。但是还是没有看到去连接所有的broker。只有当运行到procuder.send时才会有Initialize connection日志输出,以及由于metadata的needUpdate被更新成true,sender线程会开始有机会去更新metadata去连接broker(产生Initialize connection to node...for sending metadata request)。之前学习源码的时候,也只注意到二个地方去连接broker(底层方法initiateConnect,更新metadata时建立连接以及发送数据时判断待发送的node是否建立了连接)。老师是我哪里疏忽了吗,还是理解有问题。翻阅老师的书上TCP管理这块貌似没有过多的讲解,求老师指导。。

    作者回复: 我不知道您这边是怎么实验的,但是我这边的确会创建新的TCP连接~~

    2019-07-03
    1
  • 张庆
    主要的资源浪费应该是在第一次获取元数据的时候创建所有的连接,应该是这个地方可以做一些优化吧,可以做一个最小初始化数量,从元数据中随机获取配置的最少数据,然后进行初始化。然后向Broker发送消息的时候在去判断,如果没有连接就创建连接,这样应该可以折中一下吧。
    2019-07-02
    1
  • 注定非凡
    Apache Kafka的所有通信都是基于TCP的,而不是于HTTP或其他协议的
    1 为什采用TCP?
    (1)TCP拥有一些高级功能,如多路复用请求和同时轮询多个连接的能力。
    (2)很多编程语言的HTTP库功能相对的比较简陋。
    名词解释:
    多路复用请求:multiplexing request,是将两个或多个数据合并到底层—物理连接中的过程。TCP的多路复用请求会在一条物理连接上创建若干个虚拟连接,每个虚拟连接负责流转各自对应的数据流。严格讲:TCP并不能多路复用,只是提供可靠的消息交付语义保证,如自动重传丢失的报文。

    2 何时创建TCP连接?
    (1)在创建KafkaProducer实例时,
    A:生产者应用会在后台创建并启动一个名为Sender的线程,该Sender线程开始运行时,首先会创建与Broker的连接。
    B:此时不知道要连接哪个Broker,kafka会通过METADATA请求获取集群的元数据,连接所有的Broker。
    (2)还可能在更新元数据后,或在消息发送时
    3 何时关闭TCP连接
    (1)Producer端关闭TCP连接的方式有两种:用户主动关闭,或kafka自动关闭。
    A:用户主动关闭,通过调用producer.close()方关闭,也包括kill -9暴力关闭。
    B:Kafka自动关闭,这与Producer端参数connection.max.idles.ms的值有关,默认为9分钟,9分钟内没有任何请求流过,就会被自动关闭。这个参数可以调整。
    C:第二种方式中,TCP连接是在Broker端被关闭的,但这个连接请求是客户端发起的,对TCP而言这是被动的关闭,被动关闭会产生大量的CLOSE_WAIT连接。

    作者回复: 总结得相当强:)

    2019-10-31
  • worry
    我觉得在创建KafkaProducer时就通过Sender线程与某些Broker创建连接的主要目的是获取集群元数据,不一定会和集群的所有broker创建连接。发送消息时,如果和leader节点没有连接就会创建,发送完消息如果没有数据要发送,连接就会被关闭释放资源。
    2019-10-06
  • rhwayfun
    Kafka强依赖zookeeper还是有很大风险的,之前公司就出现zookeeper被打挂的场景
    2019-10-05
  • B+Tree
    第二章有写到:生产者总是向领导者副本写消息;而消费者总是从领导者副本读消息。至于追随者副本,它只做一件事:向领导者副本发送请求,请求领导者把最新生产的消息发给它,这样它能保持与领导者的同步。如果bootstrap.servers 参数中没有设置领导者副本地址,那么就得追随者副本同步数据到领导者副本?

    作者回复: bootstrap.servers设置的是连接Kafka的broker信息,和副本没有关系啊

    2019-10-05
  • 云师兄
    Kafka自动关闭连接,被动关闭的后果就是会产生大量的 CLOSE_WAIT 连接,当发起端被确认为空闲才会被被动关闭,理论数据传输量没有了,那为什么还有很多close wait?

    作者回复: 因为是被动关闭,所以才有CLOSE_WAIT,和是否有传输量关系不大

    2019-09-30
  • 有钱的包子
    老师有没有试卷题之类的

    作者回复: hmmm.... 我这边没有,网上倒是有一些面试题

    2019-09-24
  • cgddw
    broker有很多time_wait端口,甚至比established多,这是什么情况

    作者回复: 可能有很多clients端连接过该broker,而clients又都没有正常关闭所致

    2019-08-27
    1
  • cgddw
    针对第二种出现的大量无用链接,有什么好的解决办法吗?生产中发现,非java语音出现过这种大量链接占满了broker机器的连接数

    作者回复: 其他语言管理TCP资源的方式和Java版本的不一定相同。可以参考一下github官网,看看是否已知的bug。另外也取决于是什么语言?

    2019-08-26
  • yzh
    老是您好,咨询两个问题。
    1. Producer实例创建和维护的tcp连接在底层是否是多个Producer实例共享的,还是Jvm内,多个Producer实例会各自独立创建和所有broker的tcp连接
    2.Producer实例会和所有broker维持连接,这里的所有,是指和topic下各个分区leader副本所在的broker进行连接的,还是所有的broker,即使该broker下的所有topic分区都是flower

    作者回复: 1. 这些TCP连接只会被Producer实例下的Sender线程使用。多个Producer实例会创建各自的TCP连接
    2. 从长期来看,只和需要交互的Broker有连接

    2019-08-20
  • 扬一场远远的风
    内容与2018年出版的那本kafka差不多。请教一个问题,线上环境(量比较大,每天的消息量能达到1T),连续性报“java.io.IOException: Connection to 2 was disconnected before the response was read”。我贴一下报错信息:
    2019-08-18 11:46:37,499] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=2] Error sending fetch request (sessionId=88469530, epoch=INITIAL) to node 2: java.io.IOException: Connection to 2 was disconnected before the response was read. (org.apache.kafka.clients.FetchSessionHandler)
    ......... t-0-logs-11=(offset=0, logStartOffset=0, maxBytes=1698029824), t0_pt_error-6=(offset=9999, logStartOffset=9999, maxBytes=1698029824), t0_pt_find_pid-5=(offset=4439, logStartOffset=4439, maxBytes=1698029824), t0_pt_extract_patient-15=(offset=18400, logStartOffset=18400, maxBytes=1698029824), __consumer_offsets-25=(offset=83, logStartOffset=0, maxBytes=1698029824), t0_qmjkda_extract_patient-9=(offset=877, logStartOffset=12, maxBytes=1698029824), t0_qmjkda_find_pid-5=(offset=5105, logStartOffset=5105, maxBytes=1698029824), t0_pt_find_pid-35=(offset=4321, logStartOffset=4321, maxBytes=1698029824), bigscreenRdData-18=(offset=1, logStartOffset=1, maxBytes=1698029824), t0_pt_etldr_mapping-4=(offset=16075, logStartOffset=16075, maxBytes=1698029824)}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=88469530, epoch=INITIAL)) (kafka.server.ReplicaFetcherThread)
    java.io.IOException: Connection to 2 was disconnected before the response was read
    at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
    at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:96)
    at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:223)
    at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:43)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:146)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)

    作者回复: 查看一下Broker id=2的broker进程是否启动吧,从日志上来说,follower无法连接到这个broker上的leader了

    2019-08-18
    1
收起评论
45
返回
顶部