• 趙衍
    2019-07-27
    老师讲的很好,我做一些补充吧。

    Kafka在启动的时候会开启两个任务,一个任务用来定期地检查是否需要缩减或者扩大ISR集合,这个周期是replica.lag.time.max.ms的一半,默认5000ms。当检测到ISR集合中有失效副本时,就会收缩ISR集合,当检查到有Follower的HighWatermark追赶上Leader时,就会扩充ISR。

    除此之外,当ISR集合发生变更的时候还会将变更后的记录缓存到isrChangeSet中,另外一个任务会周期性地检查这个Set,如果发现这个Set中有ISR集合的变更记录,那么它会在zk中持久化一个节点。然后因为Controllr在这个节点的路径上注册了一个Watcher,所以它就能够感知到ISR的变化,并向它所管理的broker发送更新元数据的请求。最后删除该路径下已经处理过的节点。

    此外,在0.9X版本之前,Kafka中还有另外一个参数replica.lag.max.messages,它也是用来判定失效副本的,当一个副本滞后leader副本的消息数超过这个参数的大小时,则判定它处于同步失效的状态。它与replica.lag.time.max.ms参数判定出的失效副本取并集组成一个失效副本集合。

    不过这个参数本身很难给出一个合适的值。以默认的值4000为例,对于消息流入速度很低的主题(比如TPS为10),这个参数就没什么用;对于消息流入速度很高的主题(比如TPS为2000),这个参数的取值又会引入ISR的频繁变动。所以从0.9x版本开始,Kafka就彻底移除了这一个参数。
    展开
     1
     55
  • 凯
    2019-08-01
    请问一下,producer生产消息ack=all的时候,消息是怎么保证到follower的,因为看到follower是异步拉取数据的,难道是看leader和follower上面的offset吗?

    作者回复: 通过HW机制。leader处的HW要等所有follower LEO都越过了才会前移

     1
     10
  • 刘彬
    2019-07-25
    老师好,想请教您两个问题,如下:
    如果某个follower副本同步持续慢于leader副本写入速度,repkica.lag.time.max.ms 是对于二者的同步时间做的判断,我理解就是如果一直检查10s follower都赶不上leader副本的进度!
    但是,这个同步进度是用哪一块进行判别的呢?是通过index值吗?
    另外,如果某个follower不在ISR中了,kafka如果维持副本数均衡呢?比如设置了副本数为3,其中一个副本不在ISR集合中了,那么就一直少了一个副本吗?前提是这个副本一直没有跟上leader的同步进度!
    谢谢!
    展开

    作者回复: 1. 通过比较follower和leader的最新消息位移或末端消息位移(Log End Offset, LEO)
    2. 嗯,就一直少一个副本了

     2
     3
  • 曹伟雄
    2019-12-16
    老师你好,有个问题请教一下,麻烦抽空看看,谢谢。
    生产环境,因磁盘满了,所有broker宕机了,重启集群后,主题中的部分分区中,有1个副本被踢出ISR集合,只剩下leader副本了。

    试了以下几种方法都没有自动加入进来:
    1、等了3天后还是没有加入到ISR;
    2、然后重启kafka集群;
    3、用kafka-reassign-partitions.sh命令重新分配分区;

    针对此情况,请问一下有什么办法让它自动加入进来? 或者手工处理加入进来也可以。
    有什么命令可以查看follower落后多少吗? 麻烦老师给点建议或解决思路,谢谢。
    展开

    作者回复: 试试到ZooKeeper中手动删除/controller节点。这通常都是因为Controller与ZooKeeper状态不同步导致的。

    试试这个命令吧: rmr /controller

    确保在业务低峰时刻执行这个命令

    
     1
  • Mick
    2019-08-02
    老师,LEO和HW这两个概念不理解,能不能详细说下,谢谢

    作者回复: 一个分区有3个副本,一个leader,2个follower。producer向leader写了10条消息,follower1从leader处拷贝了5条消息,follower2从leader处拷贝了3条消息,那么leader副本的LEO就是10,HW=3;follower1副本的LEO是5。这样说清楚些吗

     2
     1
  • 张学磊
    2019-07-27
    个人觉得只所以使用消息队列进行异步处理就不会太关心消息处理的及时性,那当允许Follower副本对外提供读请求时,第一消费者可以降低读取消息的频率,给予Follower副本一定同步的时间,第二消费者在读取消息是优先读取Follower副本中的信息,如果读取不到再转到Leader副本中进行读取。
    
     1
  • 公号-云原生程序员
    2019-07-25
    本章原理和过程讲得很清楚。👍
    
     1
  • 陈国林
    2020-01-18
    老师好,我觉得是否可以这样分场景。对于读新的数据可以从 leader replica 读取,对于老一些的数据从follower replica 读取,这样不懂是否可行

    作者回复: 初衷是好的。难点在于我们如何区分什么数据是新的什么是老的:)

    
    
  • Geek_bcfd1e
    2019-12-16
    老师,假设一个分区有5个副本,Broker的min.insync.replicas设置为2,生产者设置acks=all,这时是有2个副本同步了就可以,还是必须是5个副本都同步,他们是什么关系。

    作者回复: Producer端认为消息已经成功提交的条件是:ISR中所有副本都已经保存了该消息,但producer并没有指定ISR中需要几个副本。这就是min.insync.replicas参数的作用。

    正常情况下,如果5个副本都在ISR中,那么它们必须都同步才行,但如果4个副本不在ISR中了,不满足min.insync.replicas了,此时broker会抛出异常给producer,告诉producer这条消息无法正确保存

    
    
  • 洪东楗
    2019-11-27
    您好,老师,自己实验中有个问题想请教下,broker有三台,topic有1分区3副本,min.insync.replicas设置为2的时候,消息都发送成功了,但是消费者没消费到数据,把min.insync.replicas设置为1,消费者就消费到数据了,希望您能帮忙解答下

    作者回复: 你是如何确定消息发送成功了呢?可以使用KafkaConsumer.endOffsets验证下消息确实写入成功了,

    
    
  • man1s
    2019-11-26
    打破follower 不可读是指让leader 写,某一个follower读吗?同一个分区应该不能同时读多个副本吧,同时读多个副本offset的维护想想就可怕

    作者回复: 有兴趣看看KIP-392的设计吧(https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica)

    
    
  • 注定非凡
    2019-11-07
    1 副本机制的定义:所谓副本机制(Replication),也可以称之为备份机制,通常是指分布式在多台网络互连的机器上保存有相同的数据拷贝。    
    2 副本机制的价值:A :提供数据冗余 B :提供高伸缩性 C :改善数据局部性
    但 Kafka的副本机制,只实现了提供数据冗余的价值。
        
    3 副本定义:
    A :Kafka有主题的概念,每个主题又分为若干个分区。副本的概念是在分区层级下定义的,每个分区配置有若干个副本。
    B :所谓副本(Replica),本质是一个只能追加写消息的提交日志。
       根据Kafka副本机制的定义,同一个分区下的所有副本保存有相同的消息序列,这些副本分散保存在不同的Broker上,从而能够对抗部分Broker宕机带来的数据不可用。
        
    4 副本角色:
    A :为解决分区下多个副本的内容一致性问题,常用方案就是采用基于领导者的副本机制。
    B :在kafka中,副本分两类:领导者副本和追随者副本。每个分区在创建时都选举一个副本,称为领导者副本,其余的副本自动成为追随者副本。
    C :Kafka的副本机制比其他分布式系统严格。Kafka的追随者副本不对外提供服务。所有的请求都要由领导者副本处理。追随者副本唯一的任务就是从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。
    D :当领导者副本所在Broker宕机了,Kafka依托于Zookeeper提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个新的领导者。当老的Leader副本重启回来后,只能作为追随者副本加入到集群中。

    4 Kafka副本机制的优点:
    A :方便实现“Read-your-writes”
    (1)含义:当使用生产者API向Kafka成功写入消息后,马上使用消息者API去读取刚才生产的消息。
    (2)如果允许追随者副本对外提供服务,由于副本同步是异步的,就可能因为数据同步时间差,从而使客户端看不到最新写入的消息。    
    B :方便实现单调读(Monotonic Reads)
    (1)单调读:对于一个消费者用户而言,在多处消息消息时,他不会看到某条消息一会存在,一会不存在。
    (2)如果允许追随者副本提供读服务,由于消息是异步的,则多个追随者副本的状态可能不一致。若客户端每次命中的副本不同,就可能出现一条消息一会看到,一会看不到。
        
    5 In-sync Replicas(ISR)同步副本
    A :追随者副本定期的异步拉取领导者副本中的数据,这存在不能和Leader实时同步的风险。
    B :Kafka引入了In-sync Replicas。ISR中的副本都是于Leader同步的副本,相反,不在ISR中的追随者副本就是被认为是与Leader不同步的。
    C :Leader 副本天然就在ISR中,即ISR不只是追随者副本集合,他必然包括Leader副本。甚至某些情况下,ISR只有Leade这一个副本。
    D :follower副本是否与leader同步的判断标准取决于Broker端参数 replica.lag.time.max.ms参数值。默认为10秒,只要一个Follower副本落后Leader副本的时间不连续超过10秒,那么Kafka就认为该Follower副本与leader是同步的,即使此时Follower副本中保存的消息明显小于Leader副本中的消息。
    E :如果同步过程持续慢于Leader副本消息的写入速度,那么replica.lag.time.max.ms时间后,此Follower副本就会被认为是与Leader副本不同步的,因此不能再放入ISR中。此时,kafka会自动收缩ISR的进度,将该副本“踢出”ISR。ISR是一个动态调整的集合,而非静态不变的。

    6 Unclean 领导者选举(Unclean Leader Election)
        A :ISR是可以动态调整的,所以会出现ISR为空的情况,由于Leader副本天然就在ISR中,如果ISR为空了,这说明Leader副本也挂掉了,Kafka需要重新选举一个新的Leader。
        B :Kafka把所有不在ISR中的存活副本都会称为非同步副本。通常,非同步副本落后Leader太多,如果让这些副本做为新的Leader,就可能出现数据的丢失。在kafka中,选举这种副本的过程称为Unclean领导者选举。
        C :Broker端参数unclean.leader.election.enable 控制是否允许Unclean领导者选举。开启Unclean领导者选举可能会造成数据丢失,但它使得分区Leader副本一直存在,不至于停止对外提供服务,因此提升了高可用性。禁止Unclean领导者选举的好处是在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。
    展开
    
    
  • 13761642169
    2019-10-18
    配置是 unclean.leader.election.enable=false
    1个分区有2个副本,r1 和 r2,isr 中只有 r1,r1 所在机器崩溃后,并且日志数据也丢失了,这种情况怎样操作让分区恢复服务?

    作者回复: 只能把r2启动起来了,而且有可能出现数据丢失。因为Kafka承诺不丢消息也是有条件的

    
    
  • 修愿三秋
    2019-10-14
    老师你好,acks=all是保证isr列表中的副本同步,如果长时间的大吞吐量,致使isr中只剩下leader,那acks=all实际起到的效果就是只同步leader一个副本,如果此时leader挂掉,那是不是会丢数据?

    作者回复: 也不算丢数据,默认配置下如果ISR为空了,这个分区就不可用了,producer也无法向这个分区发送任何消息了。对于这种情况,Kafka不认为是丢数据

     1
    
  • 云师兄
    2019-10-11
    ack=all时候,生产者向leader发送完数据,而副本是异步拉取的,那生产者写入线程要一直阻塞等待吗

    作者回复: 不会阻塞,你可以认为是不断轮询状态

    
    
  • Tim
    2019-09-29
    老师好,请教2个问题,感谢老师:
    1、它要比较的是Follower 副本落后 Leader 副本的时间是否超过10秒,那这个10s这个时间单位和LEO/HW这种度量单位是如何比较的呢?
    2、老师,follow副本是何时fetch一次leader的呢,多久fetch一次呢?有配置么?

    作者回复: 1. 比较follower LEO赶上leader LEO时的时间差是否超过了10s
    2. 不停地fetch,然后处理,之后再fetch。具体间隔没法配置

    
    
  • 大牛凯
    2019-09-25
    老师好,请问ISR中的副本一定可以保证和leader副本的一致性吗?如果有一种情况是某个ISR中副本与leader副本的lag在ISR判断的边界值,这时如果leader副本挂了的话,还是会有数据丢失是吗?谢谢老师

    作者回复: ISR中的follower副本非常有可能与leader不一致的。如果leader挂了,其他follower又都没有保存该消息,那么该消息是可能丢失的。如果你要避免这种情况,设置producer端的acks=all吧

    
    
  • jc9090kkk
    2019-09-23
    感谢老师的分享,对于这一章节的内容有两个疑问,希望老师能抽时间回复下:
    1.follower落后leader超过特定的时间,也就是超过replica.lag.time.max.ms的值,就会被踢出isr集合,那么这个时间差是如何算出来的,是通过LEO和HW计算出来的么?具体的计算细节能否简单介绍下?如果是落后特定的某个数据量阈值很容易理解,但是换成落后多少秒让我有点难以理解?
    2.ISR集合是动态的,如果后面发现被踢出的follower又追上leader,还会重新回归到ISR集合,这个里面有个矛盾的点,把落后的follower踢出去后,我个人的理解,这个follower不就变成僵尸副本了吗?因为它不是任何一个leader的follower了呀,哪里还有机会继续从leader那同步数据,然后再重新回归到ISR集合中去呢?麻烦老师解答下,谢谢

    作者回复: 1. 计算follower的LEO落后leader LEO的时间
    2. follower被踢出ISR不代表它有问题了,至少不表示它挂了,可能只是阶段性的落后leader。当追上后还是可以将其重新加入到ISR中的

    
    
  • miwucc
    2019-09-16
    ISR中进行选组也可能有10s的消息丢失?

    作者回复: 不会啊。

    
    
  • 樱花落花
    2019-09-10
    老师,ack=all,是保证ISR中的follower同步还是所有的follower同步,还有消费者是只能消费到ISR中的HW处的offset么?

    作者回复: acks=all保证ISR中的所有副本都要同步

    
    
我们在线,来聊聊吧