• 风中花
    2019-07-04
    胡老师您好! 我公司现在就我一个人懂点kafka,但是公司线下却有使用kafka,现在知道我学习这个就交给我了,我现在遇到一个线上问题:消息经常堆积起来,不能消费了,重启服务就能继续消费了。我目前得能力还搞不定,望老师能给指点一二 。谢谢。谢谢

    作者回复: 消息堆积可能原因如下:1. 生产速度大于消费速度,这样可以适当增加分区,增加consumer数量,提升消费TPS;2. consumer消费性能低,查一下是否有很重的消费逻辑(比如拿到消息后写HDFS或HBASE这种逻辑就挺重的),看看是否可以优化consumer TPS;3. 确保consumer端没有因为异常而导致消费hang住; 4. 如果你使用的是消费者组,确保没有频繁地发生rebalance

    主要排查下可能是哪些原因

     2
     17
  • 落霞与孤鹜
    2019-06-30
    这个问题和这期的内容没关系😓。

    如果一个主题,由一个应用的名为A的消费组消费,然后把消费组名改为B,重新发布应用,这个时候是不是从主题的分区头开始消费?如何保证从上次A消费组的最新偏移量处开始消费?

    作者回复: 我假设你指的名字是group.id。那么把A改成B对于Kafka而言就是新的consumer。新consumer从头还是从最新开始消费取决于auto.offset.reset的设置

     1
     5
  • Lei@时速云
    2019-06-29
    👍 胡总出专栏了

    作者回复: 磊总别闹:)

    
     2
  • James
    2019-11-13
    请问老师无法完成提交,是因为重新平衡,是什么原因才会导致.
    刚接触不久,就要修改线上环境问题.但是一直跑了一小时就会下面问题,然后oom
    Group coordinator rl-hadoop5.gci.com:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery


    Offset commit failed on partition dispatch_bus-1 at offset 28978632: The coordinator is not aware of this member.


    Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
    展开

    作者回复: 我的经验是先解决OOM的问题吧。至于commit 失败,通常是因为事件处理的速度太慢了

     1
     1
  • godtrue
    2019-08-14
    胡老师好,请教两个小问题
    1:broker通过网络拿到消息后,落盘的规则是什么?来一条消息就落盘一条?还是积攒够一定的量再落盘?
    2:从追主,新的消息是主主动推给各个从的?还是从主动拉取的?如果是主动拉取,从怎么知道主有新的消息的?另外,同步的时候是只要有一条新消息就同步嘛?
    如果其他同学清楚,也请帮忙解答一下,多谢。

    作者回复: 1. 实际上这个交由OS来决定,Kafka不管
    2. 从拉取的方式。Kafka定义了水位的概念来标识拉取的进度

     1
     1
  • 小头针
    2019-07-05
    我们的应用场景是这样的,将采集到的数据接收到kafka,并由kafka再进行生产供下一逻辑消费,消费的数据进行一些业务的修改,最后进入到查询平台中,但是经常会出现采集端的数据与查询端的数据量相差较大的情况,所以我们就简单的统计了数据写入到redis中。

    胡老师,请问Interceptor可以对生产者生产的数据以及消费者消费的数据进行统计么?


    展开

    作者回复: 可以啊,你想在里面实现什么样的逻辑都行。

    
     1
  • 张庆
    2019-07-02
    return null ; 报错了,NullPointException错误,KafkaProducer doSend方法里面获取消息主题和分区。
     1
     1
  • Liam
    2019-06-29
    请问下老师。onsend或onconsumer的执行线程和发送或消费消息的线程是否是同一个线程?加入太多耗时逻辑是否会影响主逻辑?

    作者回复: onsend是producer进程下的线程;onConsume是consumer进程下的线程,它们不是一个进程。我说的是onSend和onAcknowledgement是一个进程下的多个线程。

    
     1
  • yic
    2020-01-23
    胡老师,关于分享的案例。我感觉消息总数和时延的统计都应该放在消费端呢。毕竟消息的发送和消费是异步的,并不能保证生产者发送的消息立刻就被消费者一次消费。
    
    
  • IT小僧
    2020-01-13
    老师好!这个消费者段onConsume是在消费数据之前执行的方法,并不能真正统计实际处理该消费耗费时间的吧。我觉得理应放在onCommit里面。

    作者回复: 是的,你是对的~ 其实放在哪个方法里面取决于你的计时逻辑:)

    
    
  • 小马
    2020-01-09
    老师有两个疑问请教下:
    1、时间的一致性问题。System.currentTimeMillis() - record.timestamp()发送和接收的时间实际上可能来自两台机器,有可能时间不一致,会导致统计结果偏差很大;
    2、消费端代码计算时间差是在循环里面进行的,把System.currentTimeMillis()提到循环外面应该会好一点吧,毕竟这一批消息应该算是同时接收的;
    3、消息总数是在生产端统计的,时延是在消费端统计的,但是如果在消息传输过程中出现部分消息丢失是不是会影响统计的准确性。

    作者回复: 1. 确实有这个问题。用户自己来规避之
    2. 同意~
    3. 我们还是能保证消息不丢失的吧:)

    
    
  • 进击的姬哥
    2019-11-23
    Interceptor处理数据是单条的吗,还是多条数据作为一个集合

    作者回复: 单条消息

    
    
  • James
    2019-11-13
    不知道是kafka问题还是项目问题= =
    
    
  • James
    2019-11-13
    老师,刚才的问题描述,线上环境只有一个消费组消费,看异常好像是一个分区坏了,导致重新平衡.才导致挂了

    作者回复: 嗯嗯,分区坏了?

    
    
  • 此方彼方Francis
    2019-10-28
    老师好,在kafka的interceptor里面实现权限控制的逻辑合适吗?
    假设说有另外一个服务维护了IP和topic之间的关系,在interceptor里面线获取到本机ip有权限的topic列表,然后每次发消息的时候做判断。

    作者回复: Kafka本身也提供了一整套权限控制的逻辑。当然如果你这么用能满足你的需求,我觉得没有任何问题:)

    
    
  • 我已经设置了昵称
    2019-09-24
    老师,遇到个问题, 自己建了个consumer的interceptor,需要在interceptor中收到消息后,发送响应消息给发送方,但发现interceptor这个实例并没有被spring管理起来,就需要自己再new一个producer。但这个kafka配置又是配置在配置中心,也读不到,只能写死了。有啥好的方法吗

    作者回复: 可以复用你在Spring中创建的producer吗?

     1
    
  • 小可
    2019-08-01
    老师好,有个场景想请教下。我们发送的数据是个大json,大概500K,有一百多的字段,其中一个字段450K是图片base64,发送的速度不到100条/秒,像这种大消息体数据的发送,kafka有什么好的办法么?

    作者回复: 没什么太好的方法。你可以试试压缩。其实消息队列本不适合发送太大的消息体

     2
    
  • 撒旦的堕落
    2019-07-31
    我看到老师回答的onsend 和onAcknowledgement是生产者进程下的不同线程 既然都是子线程 为啥又特意提到onAcknowledgement这个方法又处在发送的主路径中 难道是源码中对这两个方法的调用有区别?

    作者回复: 这两个方法是在不同的线程中被调用的

    
    
  • 杨陆伟
    2019-07-19
    System.currentTimeMillis() - record.timestamp() 是否要求生产客户端设置record的timestamp字段?还是Producer Client会自动生成?对于Kafka中的timestamp还搞不太清楚,这对监控比较关键,不知道后面有没有介绍,谢谢

    作者回复: 默认情况下是消息发生时producer程序所在机器时钟

    
    
  • 风中花
    2019-07-09
    又看了一次!
    
    
我们在线,来聊聊吧