消息队列高手课
李玥
京东零售技术架构部资深架构师
立即订阅
8426 人已学习
课程目录
已完结 41 讲
0/4登录后,你可以任选4讲全文学习。
课前必读 (2讲)
开篇词 | 优秀的程序员,你的技术栈中不能只有“增删改查”
免费
预习 | 怎样更好地学习这门课?
基础篇 (8讲)
01 | 为什么需要消息队列?
02 | 该如何选择消息队列?
03 | 消息模型:主题和队列有什么区别?
04 | 如何利用事务消息实现分布式事务?
05 | 如何确保消息不会丢失?
06 | 如何处理消费过程中的重复消息?
07 | 消息积压了该如何处理?
08 | 答疑解惑(一) : 网关如何接收服务端的秒杀结果?
进阶篇 (21讲)
09 | 学习开源代码该如何入手?
10 | 如何使用异步设计提升系统性能?
11 | 如何实现高性能的异步网络传输?
12 | 序列化与反序列化:如何通过网络传输结构化的数据?
13 | 传输协议:应用程序之间对话的语言
14 | 内存管理:如何避免内存溢出和频繁的垃圾回收?
加餐 | JMQ的Broker是如何异步处理消息的?
15 | Kafka如何实现高性能IO?
16 | 缓存策略:如何使用缓存来减少磁盘IO?
17 | 如何正确使用锁保护共享数据,协调异步线程?
18 | 如何用硬件同步原语(CAS)替代锁?
19 | 数据压缩:时间换空间的游戏
20 | RocketMQ Producer源码分析:消息生产的实现过程
21 | Kafka Consumer源码分析:消息消费的实现过程
22 | Kafka和RocketMQ的消息复制实现的差异点在哪?
23 | RocketMQ客户端如何在集群中找到正确的节点?
24 | Kafka的协调服务ZooKeeper:实现分布式系统的“瑞士军刀”
25 | RocketMQ与Kafka中如何实现事务?
26 | MQTT协议:如何支持海量的在线IoT设备?
27 | Pulsar的存储计算分离设计:全新的消息队列设计思路
28 | 答疑解惑(二):我的100元哪儿去了?
案例篇 (7讲)
29 | 流计算与消息(一):通过Flink理解流计算的原理
30 | 流计算与消息(二):在流计算中使用Kafka链接计算任务
31 | 动手实现一个简单的RPC框架(一):原理和程序的结构
32 | 动手实现一个简单的RPC框架(二):通信与序列化
33 | 动手实现一个简单的RPC框架(三):客户端
34 | 动手实现一个简单的RPC框架(四):服务端
35 | 答疑解惑(三):主流消息队列都是如何存储消息的?
测试篇 (2讲)
期中测试丨10个消息队列热点问题自测
免费
期末测试 | 消息队列100分试卷等你来挑战!
结束语 (1讲)
结束语 | 程序员如何构建知识体系?
消息队列高手课
登录|注册

加餐 | JMQ的Broker是如何异步处理消息的?

李玥 2019-08-24
你好,我是李玥。
我们的课程更新到进阶篇之后,通过评论区的留言,我看到有一些同学不太理解,为什么在进阶篇中要讲这些“看起来和消息队列关系不大的”内容呢?
在这里,我跟你分享一下这门课程的设计思路。我们这门课程的名称是“消息队列高手课”,我希望你在学习完这门课程之后,不仅仅只是成为一个使用消息队列的高手,而是设计和实现消息队列的高手。所以我们在设计课程的时候,分了基础篇、进阶篇和案例篇三部分。
基础篇中我们给大家讲解消息队列的原理和一些使用方法,重点是让大家学会使用消息队列。
你在进阶篇中,我们课程设计的重点是讲解实现消息队列必备的技术知识,通过分析源码讲解消息队列的实现原理。希望你通过进阶篇的学习能够掌握到设计、实现消息队列所必备的知识和技术,这些知识和技术也是设计所有高性能、高可靠的分布式系统都需要具备的。
进阶篇的上半部分,我们每一节课一个专题,来讲解设计实现一个高性能消息队列,必备的技术和知识。这里面每节课中讲解的技术点,不仅可以用来设计消息队列,同学们在设计日常的应用系统中也一定会用得到。
前几天我在极客时间直播的时候也跟大家透露过,由我所在的京东基础架构团队开发的消息队列 JMQ,它的综合性能要显著优于目前公认性能非常好的 Kafka。虽然在开发 JMQ 的过程中有很多的创新,但是对于性能的优化这块,并没有什么全新的划时代的新技术,JMQ 之所以能做到这样的极致性能,靠的就是合理地设计和正确地使用已有的这些通用的底层技术和优化技巧。我把这些技术和知识点加以提炼和总结,放在进阶篇的上半部分中。
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《消息队列高手课》,如需阅读全部文章,
请订阅文章所属专栏。
立即订阅
登录 后留言

精选留言(41)

  • 包明
    Requests Queue 内存级的 怎么做到不丢请求?

    作者回复: 你要明白一件事儿,客户端发来一条数据,直到服务端返回给客户端发送成功,这段时间内,数据是允许丢失的。

    数据丢失后,客户端收不到发送成功响应,自然会重试。

    2019-08-26
    14
  • 青舟
    https://github.com/qingzhou413/geektime-mq-rpc.git

    使用netty作为网络库,server和client的io线程数都是1,笔记本4核标压2.3G时间2.3秒。

    作者回复: 👍👍👍

    2019-08-24
    6
  • linqw
    老师有个疑问,帮忙解答下哦,在上面的那个流程图中,WriteThread是单线程从请求队列中获取到消息然后把消息放到journal Cache,开启ReplicationThread、FlushThread进行处理,能否把WriteThread做成分配器了,mq只要保证topic下的队列有序就可以,同一个队列的消息由WriteThread分配给同一个线程进行处理,线程池的形式,线程池中的每个工作线程内部都有个集合保存消息,如果前面没有同一个队列的消息,分配给最空闲的线程进行执行,那这样的话,WriteThread只要分配消息,比如可以对发送过来的消息中要保存的队列属性值进行hash,然后根据hash值判断线程池中的所有线程的消息集合是否有相同队列的消息,有的话分配给同一个线程执行,没有的话最空闲的线程执行,mysql的binlog同步也是有几种这种策略,并发的同步sql,刚开始是基于库,同一个库的sql分配到一个线程执行同步,不同库进行并发,后来是基于redo log的组提交形式,能组提交的sql可以并发的同步,再后来是WRITESET,根据库名、表名、索引(主键和唯一索引)计算hash进行分发策略。

    作者回复: 这个流程中处理的数据已经是被分配过的,单个队列的数据。

    2019-08-25
    5
  • 付永强
    这么看来JMQ基本上是把所有涉及io等待地方步骤进行异步设计。学到了!感谢分享!🤔
    2019-08-28
    4
  • 吾皇万岁万岁万万岁
    请问老师,JMQ在follower节点响应后,就给生产者发送确认消息,此时如果leader节点故障,数据还在JournalCache里面,拿是不是可以认为这部分数据丢失?

    作者回复: 不会丢,因为数据已经复制到了从节点上,leader宕机后,会重新选举出新的leader(也就是之前的某个follower),这个新leader上是有原leader上的全部数据的。

    2019-08-24
    2
    3
  • 许童童
    老师说得很好,想成为一个真成的高手,理解并实践这些基本原理,是必不可少的,不要只停留在知识的表面,要领会技术背后的思想,才能活学活用,实践肯定是少不了的,加油。
    2019-08-24
    2
  • Martin
    https://github.com/MartinDai/mq-in-action/blob/master/src/main/java/com/doodl6/mq/MeetInRpc.java

    基于netty4实现的,Macbook pro 2015款 13寸 测试差不多4.2秒左右,老师帮忙看看哪里还可以有优化空间吗

    作者回复: 👍👍👍正确写出这个程序差不多就是这个耗时。

    2019-09-20
    1
  • linqw
    我用java实现了一版,老师帮忙看下哦,评论只能发2000字,其余在评论区补上
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import javax.annotation.Nonnull;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicLong;
    import java.util.concurrent.locks.ReentrantLock;

    /**
     * @author linqw
     */
    public class SocketExample {

        private static final Logger LOGGER = LoggerFactory.getLogger(SocketExample.class);

        private static final ReentrantLock LI_WRITE_REENTRANT_LOCK = new ReentrantLock();

        private static final ReentrantLock ZHANG_WRITE_REENTRANT_LOCK = new ReentrantLock();

        private static final int NCPU = Runtime.getRuntime().availableProcessors();

        private final ThreadPoolExecutor serverThreadPoolExecutor = new ThreadPoolExecutor(NCPU, NCPU, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(200000), new ThreadFactoryImpl("server-"), new ThreadPoolExecutor.CallerRunsPolicy());

        private final ThreadPoolExecutor clientThreadPoolExecutor = new ThreadPoolExecutor(NCPU, NCPU, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(200000), new ThreadFactoryImpl("client-"), new ThreadPoolExecutor.CallerRunsPolicy());

        private volatile boolean started = false;

        /**
         * 俩大爷已经遇见了多少次
         */
        private volatile AtomicInteger count = new AtomicInteger(0);

        /**
         * 总共需要遇见多少次
         */
        private static final int TOTAL = 1;

        private static final String Z0 = " 吃了没,您吶?";

        private static final String Z3 = " 嗨!吃饱了溜溜弯儿。";
        private static final String Z5 = " 回头去给老太太请安!";
        private static final String L1 = " 刚吃。";
        private static final String L2 = " 您这,嘛去?";
        private static final String L4 = " 有空家里坐坐啊。";

    作者回复: 还是贴一下GitHub的链接把,也方便其他同学学习。

    2019-08-25
    9
    1
  • 豆沙包
    老师,我在运行你的老大爷代码的时候发现了一个问题,经常报读长度故障。我调试了一下,应该是因为张大爷阻塞在readfrom的时候,李大爷count++了。然后count大于total,client释放了tcp。这导致readfrom无法读出准确数据。我在返回读数据故障前判断了一下count和total的关系,如果count大于等于total正常返回,就没有再报过异常了。您看我分析的是否有问题呢

    作者回复: 这个地方是因为,完成了所有会话之后,关闭socket连接的时候,没有先等待2个大爷读写socket的4个线程都结束导致的。

    虽然不影响结果,但还是可以改进一下,做到优雅的退出。

    2019-08-24
    1
  • 李冲
    借用老师的的go源码经过读,写,去掉锁3次演进,分别在金山云ECS(2核4G)上跑到3.1/2.4/1.4秒的样子。
    最终的文件:https://github.com/lichongsw/algorithm/blob/master/duplex_communication_optimization_3_no_write_lock.go

    作者回复: 👍👍👍

    2019-11-26
    1
  • 张三丰
    理解老师的讲课思路,就因为老师这么讲我才购买这个专栏,非常值!
    2019-10-23
  • 李心宇🦉
    老师好,我对JMQ的broker接收生产者请求并写入消息的流程有个疑问。
    在处理完数据落盘和多节点数据复制之后,要给生产者回复响应了,这时候broker如何能找到生产者呢?我理解是第一步生产者发送请求建立的TCP连接句柄没有释放,最后再通过这个连接句柄来write响应。这样的话,还是每个连接在得到响应之前不能释放需要占用一个线程啊。请问是怎么做到在第一步接收响应阶段只需要很少的线程的?是不是利用异步非阻塞,在线程里设置大量的协程来处理请求?

    作者回复: 你的大部分理解都没问题,有一个小问题是,维持一个TCP连接并不一定需要占用一个线程。只有在这个连接上执行收发数据的时候,才需要占用线程。收到请求处理完,把请求交给其它线程处理后,当前线程就可以释放了。直到响应生成后,这段时间只维持TCP连接是不需要占用任何线程的。

    2019-10-23
    1
  • 谢清
    刷了下评论,青舟写的最好,其次Martin
    青舟
    https://github.com/qingzhou413/geektime-mq-rpc.git
    Martin
    https://github.com/MartinDai/mq-in-action/blob/master/src/main/java/com/doodl6/mq/MeetInRpc.java
    2019-10-21
  • Tesla
    老师好,请问一下WriteThread中将消息序列化并维护起止位置offset,这个offset是否就是头部信息中保存消息长度呢?比如04abcd08abcdefgh,通过04就可以知道abcd消息的起止位置,下一个读到的08则代表下一条消息的起止位置?

    作者回复: 不是这样的,以你这个例子来说,假设abcd这条消息前面再也没有其它消息的,它是最前的一条,abcd这条消息的offset就是0, abcdefgh这条消息的offset就是6.

    2019-10-14
    1
  • 丁丁历险记
    无用之用,方为大用。
    2019-10-01
  • Six Days
    老师,希望能给出一份java 版本的参考代码以供学习,谢谢你了

    作者回复: 后续我们会把所有代码都开源出来的。

    2019-09-26
  • godtrue
    老师好, ReponseThreads 这组线程,怎么实现异步并行的没太理解,她不是需要等待复制响应或者落盘后才发消息给客户端嘛?

    作者回复: 是的,这里的“异步”是指,发响应给客户端并不是由复制或者刷盘线程来执行的,而是单独的一组线程。给每个客户端发响应这个操作是可以并行执行的。

    2019-09-24
  • mark
    做了一点优化,到了 3s

    https://gist.github.com/vipwzw/382f68995e26a2696e4da8cc5039c595

    作者回复: 👍👍👍
    最好能说一下你优化了哪些地方。

    2019-09-21
    1
  • Geek_3cb8e5
    老师,您的课程安排非常的好, 我们需要学习轮子是如何造的。
    2019-09-19
  • 顶新
    FlushThread 和 ReplicationThread线程对内存的链表 Pending Callbacks 中数据的更新,然后ResponseThread扫描链表Pending Callbacks,批量取出符合 QOS 规则的响应及超时的响应,然后并发返回给客户端。这块也存在对 Pending Callbacks 存在互斥锁吧?不知道理解的对不对,还请老师答疑 :)

    作者回复: 这个地方是不需要锁的,写入和移出Callback分别是在2个固定的线程中分别执行,这个链表写入的时候直接是尾部追加,执行Callback的时候在头部移出。并且,执行Callback的时候一定链表中有数据的时候,所以也不需要加锁担心移除越界。所以这个地方是不需要锁的

    2019-09-18
收起评论
41
返回
顶部