陈天 · Rust 编程第一课
陈天
Tubi TV 研发副总裁
23195 人已学习
新⼈⾸单¥68
登录后,你可以任选4讲全文学习
课程目录
已完结/共 65 讲
基础篇 (21讲)
陈天 · Rust 编程第一课
15
15
1.0x
00:00/00:00
登录|注册

35|实操项目:如何实现一个基本的MPSC channel?

我们实现了一个 unbounded MPSC channel,如果要将其修改为 bounded MPSC channel(队列大小是受限的),需要怎么做?
当你有了扎实的单元测试覆盖后,再做重构,比如最后我们做和性能相关的重构,就变得轻松很多,因为只要cargo test通过,起码这个重构没有引起任何回归问题(regression bug)。
在开发产品的时候,这也是一种非常有效的手段,可以让我们通过测试完善设计,最终得到一个能够让测试编译通过的、完全没有实现代码、只有接口的版本。
不同于以往的实操项目,这一讲,我们完全顺着需求写测试,然后在写测试的过程中进行数据结构和接口的设计。
今天我们一起研究了如何使用 atomics 和 Condvar,结合 VecDeque 来创建一个 MPSC unbounded channel。
另外,就是降低加锁的频率,对于消费者来说,如果我们能够一次性把队列中的所有数据都读完缓存起来,以后在需要的时候从缓存中读取,这样就可以大大减少消费者加锁的频次。
之前我们讲到,优化锁的手段无非是减小临界区的大小,让每次加锁的时间很短,这样冲突的几率就变小。
有没有可能优化锁呢?
然而,每次读写都需要获取锁,虽然锁的粒度很小,但还是让整体的性能打了个折扣。
从功能上来说,目前我们的 MPSC unbounded channel 没有太多的问题,可以应用在任何需要 MPSC channel 的场景。
如果要将其修改为 bounded MPSC channel(队列大小是受限的),需要怎么做?
重构和优化,我们可以在 Receiver 的结构中放一个 cache。
目前还缺乏 Receiver 的 Iterator 的实现,这个很简单,就是在 next() 里调用 recv() 方法。
接下来我们看生产者的功能怎么实现。
对于消费者,我们主要需要实现 recv 方法。
创建 unbounded channel 的接口很简单。
创建一个新的项目 cargo new con_utils --lib。在 cargo.toml 中添加 anyhow 作为依赖。
之前我们会从需求的角度来设计接口和数据结构,今天我们就换种方式,完全站在使用者的角度,用使用实例(测试)来驱动接口和数据结构的设计。
既然没有 Sender 了要报错,那么如果没有 Receiver了,Sender 发送时是不是也应该错误返回?这个需求和上面类似,就不赘述了。
顺着刚才的多个 sender想,如果现在所有 Sender 都退出作用域,Receiver 继续接收,到没有数据可读了,该怎么处理?
接下来考虑当队列空的时候,receiver 所在的线程会被阻塞这个需求。那么,如何对这个需求进行测试呢?这并不简单,我们没有比较直观的方式来检测线程的状态。不过,我们可以通过检测“线程是否退出”来间接判断线程是否被阻塞。
由于需要的是 MPSC,所以,我们允许多个 sender 往 channel 里发送数据。
为了实现刚才说的 MPSC channel,都有什么需求呢?首先,生产者可以产生数据,消费者能够消费产生出来的数据,也就是基本的 send/recv。
之前我们谈论了如何在搜索引擎的 Index writer 上使用 MPSC channel:要更新 index 的上下文有很多(可以是线程也可以是异步任务),而 IndexWriter 只能是唯一的。为了避免在访问 IndexWriter 时加锁,我们可以使用 MPSC channel,在多个上下文中给 channel 发消息,然后在唯一拥有 IndexWriter 的线程中读取这些消息,非常高效。
你也许会觉得不太过瘾,而且 SpinLock 也不是经常使用的并发原语,那么今天,我们试着实现一个使用非常广泛的 MPSC channel 如何?
通过上两讲的学习,相信你已经意识到,虽然并发原语看上去是很底层、很神秘的东西,但实现起来也并不像想象中的那么困难,尤其是在 Rust 下,在中,我们用了几十行代码就实现了一个简单的 SpinLock。
标题:实操项目:如何实现一个基本的MPSC channel?
思考题
小结
性能优化
实现 MPSC channel
测试驱动的设计
参考文章
实操项目:如何实现一个基本的MPSC channel?

该思维导图由 AI 生成,仅供参考

你好,我是陈天。
通过上两讲的学习,相信你已经意识到,虽然并发原语看上去是很底层、很神秘的东西,但实现起来也并不像想象中的那么困难,尤其是在 Rust 下,在第 33 讲中,我们用了几十行代码就实现了一个简单的 SpinLock。
你也许会觉得不太过瘾,而且 SpinLock 也不是经常使用的并发原语,那么今天,我们试着实现一个使用非常广泛的 MPSC channel 如何?
之前我们谈论了如何在搜索引擎的 Index writer 上使用 MPSC channel:要更新 index 的上下文有很多(可以是线程也可以是异步任务),而 IndexWriter 只能是唯一的。为了避免在访问 IndexWriter 时加锁,我们可以使用 MPSC channel,在多个上下文中给 channel 发消息,然后在唯一拥有 IndexWriter 的线程中读取这些消息,非常高效。
好,来看看今天要实现的 MPSC channel 的基本功能。为了简便起见,我们只关心 unbounded MPSC channel。也就是说,当队列容量不够时,会自动扩容,所以,任何时候生产者写入数据都不会被阻塞,但是当队列中没有数据时,消费者会被阻塞

测试驱动的设计

之前我们会从需求的角度来设计接口和数据结构,今天我们就换种方式,完全站在使用者的角度,用使用实例(测试)来驱动接口和数据结构的设计。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

本文详细介绍了如何在Rust中实现一个高效的MPSC(多生产者单消费者)channel。通过测试驱动的设计方法,作者逐步展示了实现MPSC channel的需求和相应的代码实现。文章首先描述了基本的send/recv需求,并给出了相应的单元测试。接着,作者展示了多个sender发送数据的需求,并提出了对应的单元测试。然后,作者讨论了当队列为空时receiver被阻塞的需求,并给出了相应的单元测试。随后,作者提出了当所有sender退出后receiver继续接收数据的需求,并给出了相应的单元测试。最后,作者讨论了当没有receiver时sender发送数据应该报错的需求,并给出了相应的单元测试。整个过程中,作者使用了Mutex、Condvar和AtomicUsize等并发原语,展示了如何通过这些原语实现MPSC channel。读者可以通过本文了解如何使用Rust语言实现一个高效的MPSC channel,以及如何通过测试驱动的设计方法逐步满足不同的需求。文章内容详实,对于想要深入了解Rust并发编程的读者具有很高的参考价值。文章还介绍了如何优化MPSC channel的性能,通过减小临界区的大小和降低加锁的频率来提升整体性能。同时,作者提出了思考题,引导读者思考如何将unbounded MPSC channel修改为bounded MPSC channel。整体而言,本文是一篇深入探讨Rust并发编程的优质文章,对于想要深入学习Rust并发编程的读者具有很高的参考价值。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《陈天 · Rust 编程第一课》
新⼈⾸单¥68
立即购买
登录 后留言

全部留言(18)

  • 最新
  • 精选
  • 乌龙猹
    这清晰的逻辑,完美诠释TDD 提前预定老师未来推出的 elixir 课程

    作者回复: :)

    2021-11-17
    11
  • 罗杰
    老师在一遍遍的重复 TDD,然后我把 TDD 用在了现在的 Go 项目中,效果非常好,虽然开发的时间增长了,但是代码质量显著提高了。

    作者回复: 嗯,TDD 强调关注于需求,强调测试先行,然后再一点点实现,一个 case 一个 case 跑通,有些过于模式化。我希望给大家带来的思考是,通过 test 来理解需求,然后把 test 作为数据结构和接口设计的一环,通过 test 不断完善数据结构和接口的设计,最后再实现。我个人还是习惯在接口确定后,一次性把系统实现,而不是一点点实现,测试,实现,测试。

    2021-11-17
    2
    5
  • 彭亚伦
    一边追这最新的课程更新, 一边反复温习前面的课程; 之前追《westworld》都没这么过瘾过~

    作者回复: 👍

    2021-11-17
    1
  • Colt
    最喜欢老师的实践课,也许rust的知识学得还一知半解,但是跟着老师的思路敲代码感觉非常爽,很多时候一个顺便就完成了需求

    作者回复: 👍

    2021-12-05
  • wowotuo
    也特别期待把异步tokio、元编程讲透

    作者回复: 👍

    2021-11-24
  • wowotuo
    这个实操项目非常有意义,让我对这块有了一个体系性、深入的认知,值得多读多看。

    作者回复: 👍

    2021-11-24
  • 罗同学
    我想请问一下,实现这个主要是为了理解channel 原理,这个案例可以用于实际生产不?还是说标准库里的性能会更好一点

    作者回复: 对,只是用于理解,不建议用在生产环境。标准库或者 flume,crossbream 下的 channel 性能更好,优先考虑

    2021-11-18
    2
  • Enoch Tang
    目前Sender 的Drop实现有bug,考虑以下场景: Receiver thread: 1. 加锁 2. 判断 sender 是否为0; 3. 使用CondVar wait 假设在 Receiver thread 执行完2之后切换到 其他线程执行 drop s 的操作,其他线程notify_all,此时 Receiver thread还没有进入到wait的状态,那么后面Receiver thread进入到wait状态后再也不会被唤醒了
    2023-11-11归属地:浙江
    1
  • 约书亚
    Drop for Sender<T>的实现中,notify_all没有在Mutex的保护下进行。这是否会导致一种可能性: 1. receiver发现sender不为0,准备进入wait 2. sender被drop,减一,为0,执行notify_all 3. receiver执行wait,释放锁,阻塞住 此时已经没有sender能唤醒receiver ???
    2022-11-02归属地:天津
    2
    1
  • 爱学习的小迪
    看老师的课程真的是可以学习到很多东西啊,不止Rust。太爽了
    2022-04-10
    1
收起评论
显示
设置
留言
18
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部