offset的提交不知道是不是在kafkaConsumer.commitAsync中调用coordinator.commitOffsetsAsync(offsets,callback)
1、这里设计成异步方式一开始我是比较奇怪的,他是如何保证offset不丢失呢?看了代码才知道在异步返回前会等待ConcurrentLinkQueue<offsetCommitCompletetion>中没有其他的待处理的其他的offset的commit后,才会返回,这里的非阻塞队列是线程安全的,可以避免当前提交冲掉其他的offset的提交
2、真正进行提交的时候也不是调用什么具体操作net的接口,而是向另一个ConcurrentLinkedQueue中注册了一个RequestFutureListener的监听者,当然注册之前使用了AtomicInteger来保证并发安全。
3、每个监听者应该都会由相应的Coordinator轮询处理队列中的待提交请求,将offset提交从具体的Consumer中解耦到每个组的Coordinator中。
当然以上只是个人理解,如有不当欢迎指正。
读完这个代码,我发现kafka在这里保证并发数据一致性时,使用了安全的数据结构+CAS的数据访问,灵活且大大降低了锁机制的粗粒度带来的性能损耗,只是这个代码真不容易写好,真是大牛作品!!
展开