消息队列高手课
李玥
京东零售技术架构部资深架构师
立即订阅
8426 人已学习
课程目录
已完结 41 讲
0/4登录后,你可以任选4讲全文学习。
课前必读 (2讲)
开篇词 | 优秀的程序员,你的技术栈中不能只有“增删改查”
免费
预习 | 怎样更好地学习这门课?
基础篇 (8讲)
01 | 为什么需要消息队列?
02 | 该如何选择消息队列?
03 | 消息模型:主题和队列有什么区别?
04 | 如何利用事务消息实现分布式事务?
05 | 如何确保消息不会丢失?
06 | 如何处理消费过程中的重复消息?
07 | 消息积压了该如何处理?
08 | 答疑解惑(一) : 网关如何接收服务端的秒杀结果?
进阶篇 (21讲)
09 | 学习开源代码该如何入手?
10 | 如何使用异步设计提升系统性能?
11 | 如何实现高性能的异步网络传输?
12 | 序列化与反序列化:如何通过网络传输结构化的数据?
13 | 传输协议:应用程序之间对话的语言
14 | 内存管理:如何避免内存溢出和频繁的垃圾回收?
加餐 | JMQ的Broker是如何异步处理消息的?
15 | Kafka如何实现高性能IO?
16 | 缓存策略:如何使用缓存来减少磁盘IO?
17 | 如何正确使用锁保护共享数据,协调异步线程?
18 | 如何用硬件同步原语(CAS)替代锁?
19 | 数据压缩:时间换空间的游戏
20 | RocketMQ Producer源码分析:消息生产的实现过程
21 | Kafka Consumer源码分析:消息消费的实现过程
22 | Kafka和RocketMQ的消息复制实现的差异点在哪?
23 | RocketMQ客户端如何在集群中找到正确的节点?
24 | Kafka的协调服务ZooKeeper:实现分布式系统的“瑞士军刀”
25 | RocketMQ与Kafka中如何实现事务?
26 | MQTT协议:如何支持海量的在线IoT设备?
27 | Pulsar的存储计算分离设计:全新的消息队列设计思路
28 | 答疑解惑(二):我的100元哪儿去了?
案例篇 (7讲)
29 | 流计算与消息(一):通过Flink理解流计算的原理
30 | 流计算与消息(二):在流计算中使用Kafka链接计算任务
31 | 动手实现一个简单的RPC框架(一):原理和程序的结构
32 | 动手实现一个简单的RPC框架(二):通信与序列化
33 | 动手实现一个简单的RPC框架(三):客户端
34 | 动手实现一个简单的RPC框架(四):服务端
35 | 答疑解惑(三):主流消息队列都是如何存储消息的?
测试篇 (2讲)
期中测试丨10个消息队列热点问题自测
免费
期末测试 | 消息队列100分试卷等你来挑战!
结束语 (1讲)
结束语 | 程序员如何构建知识体系?
消息队列高手课
登录|注册

32 | 动手实现一个简单的RPC框架(二):通信与序列化

李玥 2019-10-08
你好,我是李玥。
继续上节课的内容,这节课我们一起来实现这个 RPC 框架的通信和序列化部分。如何实现高性能的异步通信、如何来将结构化的数据序列化成字节流,用于网络传输或者存储到文件中,这两部分内容,我在进阶篇中都有在专门的课程分别讲解过。
网络传输和序列化这两部分的功能相对来说是非常通用并且独立的,在设计的时候,只要能做到比较好的抽象,这两部的实现,它的通用性是非常强的。不仅可以用于我们这个例子中的 RPC 框架中,同样可以直接拿去用于实现消息队列,或者其他需要互相通信的分布式系统中。
我们在实现这两部分的时候,会尽量以开发一个高性能的生产级系统这样的质量要求来设计和实现,但是为了避免代码过于繁杂影响你理解主干流程,我也会做适当的简化,简化的部分我会尽量给出提示。

如何设计一个通用的高性能序列化实现?

我们先来实现序列化和反序列化部分,因为后面讲到的部分会用到序列化和反序列化。
首先我们需要设计一个可扩展的,通用的序列化接口,为了方便使用,我们直接使用静态类的方式来定义这个接口(严格来说这并不是一个接口)。
public class SerializeSupport {
public static <E> E parse(byte [] buffer) {
// ...
}
public static <E> byte [] serialize(E entry) {
// ...
}
}
上面的 parse 方法用于反序列化,serialize 方法用于序列化。如果你对 Java 语言不是特别的熟悉,可能会看不懂<E>是什么意思,这是 Java 语言泛型机制,你可以先忽略它。看一下如何来使用这个类就明白了:
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《消息队列高手课》,如需阅读全部文章,
请订阅文章所属专栏。
立即订阅
登录 后留言

精选留言(11)

  • 每天晒白牙
    今日得到
    关键字:背压机制(back pressure)
    当客户端同步请求服务端时,客户端需要等待服务端处理完请求并返回响应,在此期间,客户端需要等待,这是一种天然背压机制

    那换成异步请求呢?
    客户端异步请求服务端时,不会等待服务端处理请求,这样就丧失了同步请求的那种天然背压。
    如果服务器处理请求的速度慢于客户端发送请求的速度,就会导致InFlightRequest(在途请求,即还未被服务端处理或响应的请求)越来越多,如果这些在途请求保存在内存中,就可能导致内存飙升,进而引发频繁FGC,之前线上遇到过几次FGC的案例【在公众号:每天晒白牙 中分享过一个vertx-redis-client客户端的案例 https://mp.weixin.qq.com/s/fWsy26VeUvb8yPKON3OTmA,后面也遇到rpc框架中也出现类似的问题】都是因为这些在途请求保存在一个无界队列中,请求得不到处理,导致内存占用过高。

    想要避免上面的情况,需要我们主动引入背压机制,即在服务端处理不过来的情况要限制客户端的请求速率,具体的做法应该有很多,我列举几个
    1.把无界队列换成有界队列,队列满了就不让添加了
    2.在往队列中添加请求的时候获取许可,服务端处理完请求释放许可,如果许可没了就阻塞客户端的请求或返回异常
    ——每天晒白牙
    2019-10-10
    4
  • Switch
    使用fastjson实现,改动代码见:https://gist.github.com/Switch-vov/8cb76aabd1e1addcdbec205cc06d9023

    作者回复: 赞交作业的同学👍

    2019-11-02
    2
  • 书中迷梦
    requestId要保持唯一性,不然消息就乱了!!而且不用保持全局唯一性,只要保证在单个服务中唯一就好
    2019-11-14
  • 亚洲舞王.尼古拉斯赵四
    想用json的方式,搞一个泛型的CommonSerializer<T>出来,序列化好说,但是在进行解析成对象的时候,发现我通过泛型无法拿到原本的Class,😂,想了半天方法用反射去搞,但是觉得这样不好,查了一下发现一般这种情况是通过传入一个你想要解析的最终类型的Class对象,但是又要求不该接口,这样我就不能传进去一个Class,无奈,只能针对每个类型实现一下Serializer接口
    2019-11-06
  • Tim Zhang
    有一点不是很理解,为啥typeMap不可以是(type, serializer),通过type获取序列化实现类一步操作,现在看源码是通过type拿到class,再class拿到serializer

    作者回复: 这样保存也是可以的,因为我们用二个map,也就是二对儿kv来保存三种数据(type, class和serializer),必然由一种数据要重复保存一下。

    获取序列化实现类的时候,需要拿到class对象的目的是检查一下serializer和class对应的数据类型是否一致。

    2019-10-30
  • JackJin
    有点不理解,Server启动时,向NameService注册接口时,序列化对象是MetadataSerializer?

    作者回复: 是这样的。

    2019-10-18
  • 陈华应
    动手动手,一定要动手,netty不熟,看起来吃力,但也收获满满,每明白一点都是进步~~~

    作者回复: 是这样的,一定要动手写代码。

    2019-10-10
  • A9
    使用JSON字符串进行序列化,String的序列化保持原样。MetaData和RpcRequest修改了size parse serialize函数 https://gist.github.com/WangYangA9/210ca898525832cba8ddd57ae1ae3d13

    作者回复: 👍👍👍

    2019-10-09
  • Better me
    ResponseInvocation 这个类是相当于一个回调类的形式吧。“就是根据响应头中的 requestId,去在途请求 inFlightRequest 中查找对应的 ResponseFuture,设置返回值并结束这个 ResponseFuture 就可以了。”这里的设置返回值是指code码形式吗?这里的作用是还需要在返回给服务端确认收到吗?老师有空看看

    作者回复: 这里面的返回值不是状态码,而是RPC调用的返回值,比如我们这个HelloService中,这个返回值就是调用HelloServiceImpl.hello()方法的返回值。

    2019-10-09
    1
  • A9
    老师的课程很多地方让人茅塞顿开,感觉从高中毕业之后就再也没有过这种抽丝剥茧学习知识的感觉了。感谢带来这么好的课程

    作者回复: 感谢,希望你能通过学习有收获,有提高。

    2019-10-08
  • lecy_L
    准点研读了一遍,受益匪浅。明天利用空闲时间尝试完成思考题。
    2019-10-08
收起评论
11
返回
顶部