Go 并发编程实战课
晁岳攀(鸟窝)
前微博技术专家,知名微服务框架 rpcx 作者
25635 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 22 讲
Go 并发编程实战课
15
15
1.0x
00:00/00:00
登录|注册

14 | Channel:透过代码看典型的应用模式

map-reduce
Stream
扇出模式
扇入模式
Or-Done模式
使用chan实现互斥锁
程序关闭时的清理动作
实现wait/notify设计模式
任务编排题的实现
"击鼓传花"的游戏
etcd中的node节点的实现
worker池的例子
任务编排
信号通知
数据传递
消息交流
方法签名:func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)
通过reflect.Select函数动态处理多个chan
思考题
典型的应用场景
反射操作Channel
Channel:透过代码看典型的应用模式

该思维导图由 AI 生成,仅供参考

你好,我是鸟窝。
前一讲,我介绍了 Channel 的基础知识,并且总结了几种应用场景。这一讲,我将通过实例的方式,带你逐个学习 Channel 解决这些问题的方法,帮你巩固和完全掌握它的用法。
在开始上课之前,我先补充一个知识点:通过反射的方式执行 select 语句,在处理很多的 case clause,尤其是不定长的 case clause 的时候,非常有用。而且,在后面介绍任务编排的实现时,我也会采用这种方法,所以,我先带你具体学习下 Channel 的反射用法。

使用反射操作 Channel

select 语句可以处理 chan 的 send 和 recv,send 和 recv 都可以作为 case clause。如果我们同时处理两个 chan,就可以写成下面的样子:
select {
case v := <-ch1:
fmt.Println(v)
case v := <-ch2:
fmt.Println(v)
}
如果需要处理三个 chan,你就可以再添加一个 case clause,用它来处理第三个 chan。可是,如果要处理 100 个 chan 呢?一万个 chan 呢?
或者是,chan 的数量在编译的时候是不定的,在运行的时候需要处理一个 slice of chan,这个时候,也没有办法在编译前写成字面意义的 select。那该怎么办?
这个时候,就要“祭”出我们的反射大法了。
通过 reflect.Select 函数,你可以将一组运行时的 case clause 传入,当作参数执行。Go 的 select 是伪随机的,它可以在执行的 case 中随机选择一个 case,并把选择的这个 case 的索引(chosen)返回,如果没有可用的 case 返回,会返回一个 bool 类型的返回值,这个返回值用来表示是否有 case 成功被选择。如果是 recv case,还会返回接收的元素。Select 的方法签名如下:
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

本文深入介绍了Channel的基础知识和典型的应用模式,包括扇出模式、Stream、map-reduce等。通过实例演示了如何使用反射操作Channel,解决处理不定数量的chan的问题。详细讨论了Channel在消息交流、数据传递和信号通知方面的典型应用场景,包括worker池、etcd中的node节点的实现、顺序的数据传递、wait/notify模式和程序的graceful shutdown。此外,还介绍了使用chan实现互斥锁的两种不同实现方式,以及多个chan的编排方式,包括Or-Done模式、扇入模式等。文章通过递归、反射等方式展示了这些模式的实现方法,为读者提供了丰富的技术参考。总结来看,本文内容丰富,涵盖了Channel的多种应用场景和实现方式,对于读者了解和掌握Channel的应用具有重要参考价值。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Go 并发编程实战课》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(32)

  • 最新
  • 精选
  • 润豪
    channel 来实现互斥锁,优势是 trylock,timeout 吧,因为mutex 没有这些功能。否则的话,是不是用回 mutex 呢

    作者回复: 对。如果不需要这些特性,我的建议是使用mutex

    2020-11-14
    2
    10
  • 茶澜子
    老师好,我在测试扇出的例子的时候,在异步运行的时候出现了 panic: send on closed channel 的错误 // 扇出模式 func FunOut(ch <-chan int, n int, async bool) []chan int { var outs []chan int for i := 0; i < n; i++ { outs = append(outs, make(chan int)) } go func() { defer func() { for i := 0; i < len(outs); i++ { close(outs[i]) } }() for v := range ch { v := v for i := 0; i < n; i++ { i := i if async { go func() { outs[i] <- v }() } else { outs[i] <- v } } } }() return outs } // TestFunOut 异步操作 扇入模式 func TestFunOutAsync(t *testing.T) { dataStreams := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23} inputChan := gen(dataStreams...) // 将数据写入一个channel ch := sq(inputChan) // 将所有的数据平方,再重新放入channel outArray := FunOut(ch, 3,true) length := len(outArray) t.Log("length of out channel:", length) var wg sync.WaitGroup wg.Add(length) for i := 0; i < length; i++ { c:=len(outArray[i]) fmt.Println("输入chan len", i, c) go func(in <-chan int, index int) { sum:=0 for item:=range in{ fmt.Println("item", index, item) sum+=item } fmt.Println("worker", index, sum) wg.Done() }(outArray[i], i) } wg.Wait() } 老师,我没看明白是哪里出错了?

    作者回复: 这段代码在异步的情况下会有问题。fanout在退出时会把out全close,这时异步还在读着。很好的发现,你可以利用所学的尝试解决这个问题

    2020-11-17
    4
    5
  • 斯蒂芬.赵
    老师想问一下上面所讲的击鼓传花的案例(流水线模式)的应用场景是?感觉就是按照顺序串行的话执行某些任务逻辑,不用goroutine的话也可以吧

    作者回复: 和工业化流水线一个意思。每个goroutine负责一个业务

    2021-11-15
    1
  • myrfy
    老师好,我有两个问题 1、关于or done或者fan in模式,我之前在sof上看到过类似的问题,其中的高赞回答是说,启动与ch数量相等的goroutine,每个goroutine监听一个ch并把读到的结果放入一个收集ch的模式效率要比反射高,并且给出了测评数据,现在手机码字,不太好找到。但想和老师确认一下是不是后面go某个版本对反射做了优化呢? 2、fanout模式里提到可以同步或者异步启动任务。在老师给出的示例代码中,异步启动的优势是什么呢?我猜老师想表达的是不是启动任务前可能还有一些耗时的准备操作?如果是这样的话,建议增加一个注释,否则感觉启动一个goroutine只是为了写一个ch,好像异步效率会更低

    作者回复: 我并没有benchmark结果列在这里,凭经验我们也知道反射的效率很低。Francesc Campoy有一篇文章专门做了测试,你可以搜一下。 analyzing the performance of go functions with benchmarks. 异步的方式并不是你所说的目的,而是避免一个out chan阻塞的时候影响其他out

    2020-11-11
    1
  • Tokamak
    晁老师,这是我用多个goroutine 实现的 OrDone,但是有一个问题是在 执行close(orDone)是可能会引发panic,引发panic的原因是因为可能多次关闭一个chan, 请问我这个需要如何改才能不引发panic呢? func or3(channels ...<-chan interface{}) <-chan interface{} { if len(channels) == 0 { return nil } orDone := make(chan interface{}) for _, c := range channels { go func(ch <-chan interface{}) { <-ch close(orDone) // 可能会引发panic }(c) } return orDone }

    作者回复: 有多种方法,比如使用sync.Once,或者放在一个函数中加上recover机制

    2022-11-19归属地:北京
  • A9
    一个基于 TCP 网络的分布式的 Channel ,请问这个有git仓库吗,想学习下

    作者回复: https://github.com/chrislusf/netchan

    2022-09-13归属地:北京
  • Geek_a6104e
    请问一下最后一段代码里面case <-done: 有什么用呢?

    作者回复: 可以用来主动退出

    2022-07-31归属地:北京
  • པག་ཏོན་།
    老师您好,在看Marcio Castilho 在 使用 Go 每分钟处理百万请求的例子的时候我很困惑,我认为双层管道是没有意义的,生产者直接发送job给消费者,额定数量的消费者直接进行并发接收并处理就可以达到同样控制并发的效果。为什么非要消费者把一个管道交给生产者,生产者在把job通过管道传递给消费者。我想请问一下这个步骤的作用是什么?

    作者回复: 我也觉得他的实现方案有问题,有点感觉是从其它语言的实现复制过来的,有点设计过度。 我和你的设计方案也有些不同,你的方案类似scala的actor模式。这种模式有个问题,如果某个actor处理慢了,可能会导致任务堆积在这个actor上。 go很常规的操作就是用一个channel实现buffer,各个goroutine都从这个buffer获取任务就好了。

    2021-10-10
  • davix
    請問老師,channel這些模式都適合哪些塲景使用,能哪些優缺點能講講嗎

    作者回复: 以后有机会扩充一下

    2021-10-06
  • 科科
    老师,请问下为什么在createCase函数里面,我们在创建一个SelectCase变量的时候,要使用reflect.ValueOf重新初始化一个的channel?

    作者回复: 没有重新,每一个建一个

    2021-05-20
收起评论
显示
设置
留言
32
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部