• 包明
    2019-08-26
    Requests Queue 内存级的 怎么做到不丢请求?

    作者回复: 你要明白一件事儿,客户端发来一条数据,直到服务端返回给客户端发送成功,这段时间内,数据是允许丢失的。

    数据丢失后,客户端收不到发送成功响应,自然会重试。

    
     15
  • 青舟
    2019-08-24
    https://github.com/qingzhou413/geektime-mq-rpc.git

    使用netty作为网络库,server和client的io线程数都是1,笔记本4核标压2.3G时间2.3秒。

    作者回复: 👍👍👍

    
     7
  • linqw
    2019-08-25
    老师有个疑问,帮忙解答下哦,在上面的那个流程图中,WriteThread是单线程从请求队列中获取到消息然后把消息放到journal Cache,开启ReplicationThread、FlushThread进行处理,能否把WriteThread做成分配器了,mq只要保证topic下的队列有序就可以,同一个队列的消息由WriteThread分配给同一个线程进行处理,线程池的形式,线程池中的每个工作线程内部都有个集合保存消息,如果前面没有同一个队列的消息,分配给最空闲的线程进行执行,那这样的话,WriteThread只要分配消息,比如可以对发送过来的消息中要保存的队列属性值进行hash,然后根据hash值判断线程池中的所有线程的消息集合是否有相同队列的消息,有的话分配给同一个线程执行,没有的话最空闲的线程执行,mysql的binlog同步也是有几种这种策略,并发的同步sql,刚开始是基于库,同一个库的sql分配到一个线程执行同步,不同库进行并发,后来是基于redo log的组提交形式,能组提交的sql可以并发的同步,再后来是WRITESET,根据库名、表名、索引(主键和唯一索引)计算hash进行分发策略。
    展开

    作者回复: 这个流程中处理的数据已经是被分配过的,单个队列的数据。

    
     5
  • 付永强
    2019-08-28
    这么看来JMQ基本上是把所有涉及io等待地方步骤进行异步设计。学到了!感谢分享!🤔
    
     4
  • 吾皇万岁万岁万万岁
    2019-08-24
    请问老师,JMQ在follower节点响应后,就给生产者发送确认消息,此时如果leader节点故障,数据还在JournalCache里面,拿是不是可以认为这部分数据丢失?

    作者回复: 不会丢,因为数据已经复制到了从节点上,leader宕机后,会重新选举出新的leader(也就是之前的某个follower),这个新leader上是有原leader上的全部数据的。

     2
     3
  • ub8
    2019-12-14
    老师,文中提到的 “我们把回复响应这个需要等待资源的操作,也异步放到其他的线程中去执行。”;这个是怎么实现的呢? ResponseThread ,和 RequestThread 是如何对应上的?

    作者回复: 一般简单常用的做法都是在处理request的线程中执行业务逻辑,然后把response返回。

    其实从网络层面来看,Request和Response只是客户端和服务端互相发送的两段数据,和服务端处理用什么线程完全没有关系。

    Request和Response如何对应,这个取决于网络传输协议是怎么设计的。这个我们在课程中讲过。

    在服务端实现中,可以接受到Request之后,进行异步处理,异步处理完成之后,无论在什么线程中,只要能拿到处理结果构建出Response,通过网络发给客户端就可以了。

    
     2
  • 许童童
    2019-08-24
    老师说得很好,想成为一个真成的高手,理解并实践这些基本原理,是必不可少的,不要只停留在知识的表面,要领会技术背后的思想,才能活学活用,实践肯定是少不了的,加油。
    
     2
  • 李冲
    2019-11-26
    借用老师的的go源码经过读,写,去掉锁3次演进,分别在金山云ECS(2核4G)上跑到3.1/2.4/1.4秒的样子。
    最终的文件:https://github.com/lichongsw/algorithm/blob/master/duplex_communication_optimization_3_no_write_lock.go

    作者回复: 👍👍👍

     1
     1
  • 谢清
    2019-10-21
    刷了下评论,青舟写的最好,其次Martin
    青舟
    https://github.com/qingzhou413/geektime-mq-rpc.git
    Martin
    https://github.com/MartinDai/mq-in-action/blob/master/src/main/java/com/doodl6/mq/MeetInRpc.java
    展开
    
     1
  • Martin
    2019-09-20
    https://github.com/MartinDai/mq-in-action/blob/master/src/main/java/com/doodl6/mq/MeetInRpc.java

    基于netty4实现的,Macbook pro 2015款 13寸 测试差不多4.2秒左右,老师帮忙看看哪里还可以有优化空间吗

    作者回复: 👍👍👍正确写出这个程序差不多就是这个耗时。

    
     1
  • linqw
    2019-08-25
    我用java实现了一版,老师帮忙看下哦,评论只能发2000字,其余在评论区补上
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import javax.annotation.Nonnull;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicLong;
    import java.util.concurrent.locks.ReentrantLock;

    /**
     * @author linqw
     */
    public class SocketExample {

        private static final Logger LOGGER = LoggerFactory.getLogger(SocketExample.class);

        private static final ReentrantLock LI_WRITE_REENTRANT_LOCK = new ReentrantLock();

        private static final ReentrantLock ZHANG_WRITE_REENTRANT_LOCK = new ReentrantLock();

        private static final int NCPU = Runtime.getRuntime().availableProcessors();

        private final ThreadPoolExecutor serverThreadPoolExecutor = new ThreadPoolExecutor(NCPU, NCPU, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(200000), new ThreadFactoryImpl("server-"), new ThreadPoolExecutor.CallerRunsPolicy());

        private final ThreadPoolExecutor clientThreadPoolExecutor = new ThreadPoolExecutor(NCPU, NCPU, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(200000), new ThreadFactoryImpl("client-"), new ThreadPoolExecutor.CallerRunsPolicy());

        private volatile boolean started = false;

        /**
         * 俩大爷已经遇见了多少次
         */
        private volatile AtomicInteger count = new AtomicInteger(0);

        /**
         * 总共需要遇见多少次
         */
        private static final int TOTAL = 1;

        private static final String Z0 = " 吃了没,您吶?";

        private static final String Z3 = " 嗨!吃饱了溜溜弯儿。";
        private static final String Z5 = " 回头去给老太太请安!";
        private static final String L1 = " 刚吃。";
        private static final String L2 = " 您这,嘛去?";
        private static final String L4 = " 有空家里坐坐啊。";
    展开

    作者回复: 还是贴一下GitHub的链接把,也方便其他同学学习。

     9
     1
  • 豆沙包
    2019-08-24
    老师,我在运行你的老大爷代码的时候发现了一个问题,经常报读长度故障。我调试了一下,应该是因为张大爷阻塞在readfrom的时候,李大爷count++了。然后count大于total,client释放了tcp。这导致readfrom无法读出准确数据。我在返回读数据故障前判断了一下count和total的关系,如果count大于等于total正常返回,就没有再报过异常了。您看我分析的是否有问题呢

    作者回复: 这个地方是因为,完成了所有会话之后,关闭socket连接的时候,没有先等待2个大爷读写socket的4个线程都结束导致的。

    虽然不影响结果,但还是可以改进一下,做到优雅的退出。

    
     1
  • leslie
    2019-08-24
        今天老师重新梳理之后知道大致需要哪些知识了,借着之前老师的教诲做个总结:
        基础知识
        1.网络方面:消息队列所用的知识确实只是使用在应用层关于全双工这块知识,其它层次暂不涉及
        2.程序:我们只需要掌握常规的编程手法就行;至于我们去用何种语言实现不重要的,重要的是基于问题通过代码高效的解决就行
        3.操作系统:我们需要明白系统进程之间的调用,从而才能合理的去实现异步和并发
        课程的学习
        1.老师其实是最初在传授消息队列的由来、框架、不同消息队列的使用场景
        2.教授整个消息队列的生产者和消费者之间的关系和特性:让我们理解消息队列
        3.通过上述基础:后面的课程就是不断的通过真实现状理论+实际案例去操作动手;渔授予了,就要开始让我们捕鱼了,
        通过我们捕鱼的问题去修正我们的错误,提升我们捕鱼的能力。这是我学习到现在对于课程学习的感受。
        代码能力偏弱:不过后面会尽力去做;毕竟课程涉及的东西和知识,为了学习好课程一直在跟刘老师课并用刘老师的学习方式在补充学习中所不足;
        知识的课程是跟上了,代码的实践落下了;尽力去做尽力跟上老师的步伐吧,希望老师课程结课时能达到自己最初的学习目标。
    展开
    
     1
  • 张三丰
    2019-10-23
    理解老师的讲课思路,就因为老师这么讲我才购买这个专栏,非常值!
    
    
  • 李心宇🦉
    2019-10-23
    老师好,我对JMQ的broker接收生产者请求并写入消息的流程有个疑问。
    在处理完数据落盘和多节点数据复制之后,要给生产者回复响应了,这时候broker如何能找到生产者呢?我理解是第一步生产者发送请求建立的TCP连接句柄没有释放,最后再通过这个连接句柄来write响应。这样的话,还是每个连接在得到响应之前不能释放需要占用一个线程啊。请问是怎么做到在第一步接收响应阶段只需要很少的线程的?是不是利用异步非阻塞,在线程里设置大量的协程来处理请求?

    作者回复: 你的大部分理解都没问题,有一个小问题是,维持一个TCP连接并不一定需要占用一个线程。只有在这个连接上执行收发数据的时候,才需要占用线程。收到请求处理完,把请求交给其它线程处理后,当前线程就可以释放了。直到响应生成后,这段时间只维持TCP连接是不需要占用任何线程的。

     1
    
  • Tesla
    2019-10-14
    老师好,请问一下WriteThread中将消息序列化并维护起止位置offset,这个offset是否就是头部信息中保存消息长度呢?比如04abcd08abcdefgh,通过04就可以知道abcd消息的起止位置,下一个读到的08则代表下一条消息的起止位置?

    作者回复: 不是这样的,以你这个例子来说,假设abcd这条消息前面再也没有其它消息的,它是最前的一条,abcd这条消息的offset就是0, abcdefgh这条消息的offset就是6.

     1
    
  • 丁丁历险记
    2019-10-01
    无用之用,方为大用。
    
    
  • Six Days
    2019-09-26
    老师,希望能给出一份java 版本的参考代码以供学习,谢谢你了

    作者回复: 后续我们会把所有代码都开源出来的。

    
    
  • godtrue
    2019-09-24
    老师好, ReponseThreads 这组线程,怎么实现异步并行的没太理解,她不是需要等待复制响应或者落盘后才发消息给客户端嘛?

    作者回复: 是的,这里的“异步”是指,发响应给客户端并不是由复制或者刷盘线程来执行的,而是单独的一组线程。给每个客户端发响应这个操作是可以并行执行的。

    
    
  • mark
    2019-09-21
    做了一点优化,到了 3s

    https://gist.github.com/vipwzw/382f68995e26a2696e4da8cc5039c595

    作者回复: 👍👍👍
    最好能说一下你优化了哪些地方。

     1
    
我们在线,来聊聊吧