• 胡夕
    置顶
    2020-05-06
    你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~ 上节课,咱们重点了解了Kafka中位移索引和时间戳索引的不同。课后我让你尝试自行编写一个函数,该函数实现类似于CEILING 函数的位移查找逻辑,即返回不小于给定位移值 targetOffset 的最小位移值和对应的物理文件位置。以下是我给出的代码: def ceilingLookup(targetOffset: Long): OffsetPosition = { maybeLock(lock) { val idx = mmap.duplicate val slot = smallestUpperBoundSlotFor(idx, targetOffset, IndexSearchType.KEY) if (slot == -1) OffsetPosition(baseOffset, 0) else parseEntry(idx, slot) } } okay,你是怎么考虑的呢?我们可以一起讨论下。
    
    6
  • Ball
    2020-10-04
    思考题:监控 Request 队列当前的使用情况,应该是 newGauge(requestQueueSizeMetricName, () => requestQueue.size) 这个方法里面,requestQueueSizeMetricName 这个指标。

    作者回复: 这个指标名称实际上是RequestQueueSize:)

    
    1
  • delete is create
    2020-04-28
    感觉kafka中大量使用协调器(findCoordinator) 老师协调器到底是做什么的 我今天看一个kafkaAdminClient中获取消费者组信息里面也有这个东西

    作者回复: 协调器是做协调用的:) hmmm... sorry,哈哈哈 目前Kafka中有两类协调器,GroupCoordinator和TransactionCoordinator。前者用于管理消费者组,后者是用于事务管理的。就以消费者组来说,Coordinator需要生成消费者组的元数据信息,负责维护组的位移提交和获取以及全套的rebalance流程支持。

    共 2 条评论
    1
  • lucas
    2021-03-28
    老师好,请教一个问题,我flink1.12.1写入kafka2.3.0的时候报错,而且是运行几个小时之后才会报 Transiting to fatal error state due to org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id. Transiting to fatal error state due to org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 1

    作者回复: 还是版本不匹配的问题

    
    
  • Geek_ae1f59
    2020-06-11
    老师你好,我们kafka集群responseSendTimeMs监控指标非常大,而且部分节点还抛出异常:java.io.IOException: Connection to xx was disconnected before the response was read,关于这个有什么排查思路吗?

    作者回复: 排查网络的问题吧

    
    
  • sljoai
    2020-06-10
    “StartThrottlingResponse:用于通知 Broker 的 Socket Server 组件某个 TCP 连接通信通道开始被限流(throttling). ”这里的限流如何理解呢?是物理网络上的限制,还是可以在Kafka中设置的?!

    作者回复: Kafka实现的。基本上可以认为是人为引入延时

    
    
  • 空知
    2020-05-05
    用kafka-configs 命令把Processor 线程池中的线程数量减少 会导致响应丢失的吧,这个不用处理或者警告限制嘛?

    作者回复: 的确可能丢失,到时候客户端自行处理。不过我同意应该加个告警。如果你有兴趣可以提交一个patch给社区,修改RequestChannel.scala的sendResponse方法代码即可:)

    
    
  • 小崔
    2020-05-05
    对于一个特定的请求,TotalTimeMs = RequestQueueTimeMs + LocalTimeMs + RemoteTimeMs,对吗?还是说LocalTimeMs包含了(RemoteTimeMs + 本地处理时间)?

    作者回复: 前一个是对的。local time是指本地broker写入磁盘的时间

    共 2 条评论
    
  • 镜子中间
    2020-05-01
    这节课的信息量很大,干货满满,就是听完下来云里雾里,打算对照源码和老师的讲解梳理一遍

    作者回复: 加油加油,如果有问题就提出来, 我第一时间回复

    
    
  • 🐾
    2020-04-29
    胡总下午好,专栏后面会介绍如何在IDEA进行调试吗?想看下客户端发送消息到服务端的整个流程,再结合文章介绍的功能点,功力提升效果会比较好些

    作者回复: hmmm,实话说本专栏聚焦于Broker端代码。客户端染指的很少。不过你可以将整个流程拆成broker端调试+clients端调试。特别是broker端调试,直接在KafkaApis.scala中打断点就可以

    共 2 条评论
    