Kafka核心源码解读
胡夕
Apache Kafka Committer,老虎证券技术总监
新⼈⾸单¥19.9
3964 人已学习
课程目录
已完结 44 讲
0/4登录后,你可以任选4讲全文学习。
课前必学 (3讲)
开篇词 | 阅读源码,逐渐成了职业进阶道路上的“必选项”
免费
导读 | 构建Kafka工程和源码阅读环境、Scala语言热身
重磅加餐 | 带你快速入门Scala语言
日志模块 (5讲)
01 | 日志段:保存消息文件的对象是怎么实现的?
02 | 日志(上):日志究竟是如何加载日志段的?
03 | 日志(下):彻底搞懂Log对象的常见操作
04 | 索引(上):改进的二分查找算法在Kafka索引的应用
05 | 索引(下):位移索引和时间戳索引的区别是什么?
请求处理模块 (5讲)
06 | 请求通道:如何实现Kafka请求队列?
07 | SocketServer(上):Kafka到底是怎么应用NIO实现网络通信的?
08 | SocketServer(中):请求还要区分优先级?
09 | SocketServer(下):请求处理全流程源码分析
10 | KafkaApis:Kafka最重要的源码入口,没有之一
Controller模块 (5讲)
11 | Controller元数据:Controller都保存有哪些东西?有几种状态?
12 | ControllerChannelManager:Controller如何管理请求发送?
13 | ControllerEventManager:变身单线程后的Controller如何处理事件?
14 | Controller选举是怎么实现的?
15 | 如何理解Controller在Kafka集群中的作用?
状态机模块 (3讲)
16 | TopicDeletionManager: Topic是怎么被删除的?
17 | ReplicaStateMachine:揭秘副本状态机实现原理
18 | PartitionStateMachine:分区状态转换如何实现?
延迟操作模块 (2讲)
19 | TimingWheel:探究Kafka定时器背后的高效时间轮算法
20 | DelayedOperation:Broker是怎么延时处理请求的?
副本管理模块 (6讲)
21 | AbstractFetcherThread:拉取消息分几步?
22 | ReplicaFetcherThread:Follower拉取Leader消息是如何实现的?
23 | ReplicaManager(上):必须要掌握的副本管理类定义和核心字段
24 | ReplicaManager(中):副本管理器是如何读写副本的?
25 | ReplicaManager(下):副本管理器是如何管理副本的?
26 | MetadataCache:Broker是怎么异步更新元数据缓存的?
消费者组管理模块 (7讲)
27 | 消费者组元数据(上):消费者组都有哪些元数据?
28 | 消费者组元数据(下):Kafka如何管理这些元数据?
29 | GroupMetadataManager:组元数据管理器是个什么东西?
30 | GroupMetadataManager:位移主题保存的只是位移吗?
31 | GroupMetadataManager:查询位移时,不用读取位移主题?
32 | GroupCoordinator:在Rebalance中,Coordinator如何处理成员入组?
33 | GroupCoordinator:在Rebalance中,如何进行组同步?
特别放送 (5讲)
特别放送(一)| 经典的Kafka学习资料有哪些?
特别放送(二)| 一篇文章带你了解参与开源社区的全部流程
特别放送(三)| 我是怎么度过日常一天的?
特别放送(四)| 20道经典的Kafka面试题详解
特别放送(五) | Kafka 社区的重磅功能:移除 ZooKeeper 依赖
期中、期末测试 (2讲)
期中测试 | 这些源码知识,你都掌握了吗?
期末测试 | 一套习题,测试你的掌握程度
结束语 (1讲)
结束语 | 源码学习,我们才刚上路呢
Kafka核心源码解读
15
15
1.0x
00:00/00:00
登录|注册

32 | GroupCoordinator:在Rebalance中,Coordinator如何处理成员入组?

胡夕 2020-07-11
你好,我是胡夕。不知不觉间,课程已经接近尾声了,最后这两节课,我们来学习一下消费者组的 Rebalance 流程是如何完成的。
提到 Rebalance,你的第一反应一定是“爱恨交加”。毕竟,如果使用得当,它能够自动帮我们实现消费者之间的负载均衡和故障转移;但如果配置失当,我们就可能触碰到它被诟病已久的缺陷:耗时长,而且会出现消费中断。
在使用消费者组的实践中,你肯定想知道,应该如何避免 Rebalance。如果你不了解 Rebalance 的源码机制的话,就很容易掉进它无意中铺设的“陷阱”里。
举个小例子。有些人认为,Consumer 端参数 session.timeout.ms 决定了完成一次 Rebalance 流程的最大时间。这种认知是不对的,实际上,这个参数是用于检测消费者组成员存活性的,即如果在这段超时时间内,没有收到该成员发给 Coordinator 的心跳请求,则把该成员标记为 Dead,而且要显式地将其从消费者组中移除,并触发新一轮的 Rebalance。而真正决定单次 Rebalance 所用最大时长的参数,是 Consumer 端的 max.poll.interval.ms。显然,如果没有搞懂这部分的源码,你就没办法为这些参数设置合理的数值。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《Kafka核心源码解读》,如需阅读全部文章,
请订阅文章所属专栏新⼈⾸单¥19.9
立即订阅
登录 后留言

精选留言(6)

  • 胡夕 置顶
    你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~

    上节课,我们重点学习了GroupMetadataManager类读写位移主题的源码。课后我请你分析cleanGroupMetadata方法的流程。实际上,这个方法调用带参数的同名方法cleanupGroupMetadata来执行组位移的清除。后者会遍历给定的所有消费者组,之后调用removeExpiredOffsets方法执行过期位移的清除。清除的主要依据是看当前时间与位移提交的时间的差值是否越过了offsets.retention.minutes参数值。如果越过了则视该位移为过期,需要从offsets中移除。同时,cleanupGroupMetadata方法还会构造tombstone消息并写入到内部位移主题执行主题中的过期位移消息的输出。

    okay,你同意这个说法吗?或者说你有其他的看法吗?我们可以一起讨论下。
    2020-07-21
  • 双椒叔叔
    胡老师,您好
    分区迁移遇到的问题怎么解决呢
    1038,1037,1029-----reassign----->1038,1037,1048
    其实就是1029机子先宕掉了,然后我想要把死掉的1029机子上的副本迁移到1048上

    但是迁移计划卡死了,一直in progress
    replica变成1038,1037,1048,1029了,ISR变成了1038,1048

    现在我想要在replica中remove1029,我看了源码发现是状态机维护的每一个replica状态

    源码中
    1038,1037,1029-----reassign----->1038,1037,1048
    迁移计划卡住的那步是这样的
    要把1029副本的状态从replica中移除流程是
    controller先把1029offline,
    然后
    controller发送状态改变请求(deleted)给1029


    1、first move the replica to offline state (the controller removes it from the ISR)

    2、send stop replica command to the old replicas


    3、Eventually partition reassignment could use a callback that does retries if deletion failed

    如果1029这个节点不在线的话就会
    返回一个回调状态值NonExistentReplica(因为1029现在是死了的状态)。
    reassignment那里的源码大概我看了下,不知道上面的理解对不对
    就想问下,想这种原本3副本,现在执行迁移计划后,replica中多了一个不在线的1029,然后执行计划一直in progress
    那么这种情况如何解决呢?
    是直接在zk中修改该主题的问题分区(执行迁移计划卡主的那个分区)吗?生产中1k个分区,zk中不知道敢不敢修改

    作者回复: 删除zk中/admin/reassign_partitions下对应的节点然后重启broker试试呢

    2020-07-13
    1
  • 云端漫漫步
    GroupState是Stable, CompletingRebalance, Empty这三种的情况下才可以

    作者回复: ������

    2020-07-25
  • Jeff.Smile
    老师您好,想问下如何能明确观察到reblance的过程,因为有时候知道发生了reblance,但是不能确认是什么原因引起的reblance,所以想通过一定手段定位下,比如tcpdump抓包,但是用wireshark打开并转为kafka协议好像也看不出reblance的过程,也可能是我用的不熟练,想请老师可以补充下这方面的,毕竟理论结合实践才是解决问题的途径。
    补充说一下,好像server.log也只能看到说Reblance开始了,Member被移除。。之类等等。

    作者回复: 打开DEBUG日志,会有非常详细的日志输出,重点看看Coordinator部分的DEBUG日志

    2020-07-22
  • 懂码哥(GerryWen)
    胡大大,您的社区名字有点意思。https://issues.apache.org/jira/secure/ViewProfile.jspa?name=huxi_2b
    😀😁😝

    作者回复: 额。。。不是你想的那样:)

    2020-07-13
  • 伯安知心
    从代码上看,进入maybePrepareRebalance的时候,首先把group加入锁中,因为这里要访问消费组的元数据(线程不安全),然后只有一个判断if (group.canRebalance),这个判断主要是判断消费组元数据中的validPreviousStates的map集合中是否存在PreparingRebalance状态的数据,事实上这个状态就是一个过渡的中间状态比如某些成员加入超时的状态,所有成员离开了组,通过分区迁移删除组。

    作者回复: 👍

    2020-07-11
    1
收起评论
6
返回
顶部