14 | Channel:透过代码看典型的应用模式
该思维导图由 AI 生成,仅供参考
使用反射操作 Channel
- 深入了解
- 翻译
- 解释
- 总结
本文深入介绍了Channel的基础知识和典型的应用模式,包括扇出模式、Stream、map-reduce等。通过实例演示了如何使用反射操作Channel,解决处理不定数量的chan的问题。详细讨论了Channel在消息交流、数据传递和信号通知方面的典型应用场景,包括worker池、etcd中的node节点的实现、顺序的数据传递、wait/notify模式和程序的graceful shutdown。此外,还介绍了使用chan实现互斥锁的两种不同实现方式,以及多个chan的编排方式,包括Or-Done模式、扇入模式等。文章通过递归、反射等方式展示了这些模式的实现方法,为读者提供了丰富的技术参考。总结来看,本文内容丰富,涵盖了Channel的多种应用场景和实现方式,对于读者了解和掌握Channel的应用具有重要参考价值。
《Go 并发编程实战课》,新⼈⾸单¥59
全部留言(32)
- 最新
- 精选
- 润豪channel 来实现互斥锁,优势是 trylock,timeout 吧,因为mutex 没有这些功能。否则的话,是不是用回 mutex 呢
作者回复: 对。如果不需要这些特性,我的建议是使用mutex
2020-11-14210 - 茶澜子老师好,我在测试扇出的例子的时候,在异步运行的时候出现了 panic: send on closed channel 的错误 // 扇出模式 func FunOut(ch <-chan int, n int, async bool) []chan int { var outs []chan int for i := 0; i < n; i++ { outs = append(outs, make(chan int)) } go func() { defer func() { for i := 0; i < len(outs); i++ { close(outs[i]) } }() for v := range ch { v := v for i := 0; i < n; i++ { i := i if async { go func() { outs[i] <- v }() } else { outs[i] <- v } } } }() return outs } // TestFunOut 异步操作 扇入模式 func TestFunOutAsync(t *testing.T) { dataStreams := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23} inputChan := gen(dataStreams...) // 将数据写入一个channel ch := sq(inputChan) // 将所有的数据平方,再重新放入channel outArray := FunOut(ch, 3,true) length := len(outArray) t.Log("length of out channel:", length) var wg sync.WaitGroup wg.Add(length) for i := 0; i < length; i++ { c:=len(outArray[i]) fmt.Println("输入chan len", i, c) go func(in <-chan int, index int) { sum:=0 for item:=range in{ fmt.Println("item", index, item) sum+=item } fmt.Println("worker", index, sum) wg.Done() }(outArray[i], i) } wg.Wait() } 老师,我没看明白是哪里出错了?
作者回复: 这段代码在异步的情况下会有问题。fanout在退出时会把out全close,这时异步还在读着。很好的发现,你可以利用所学的尝试解决这个问题
2020-11-1745 - 斯蒂芬.赵老师想问一下上面所讲的击鼓传花的案例(流水线模式)的应用场景是?感觉就是按照顺序串行的话执行某些任务逻辑,不用goroutine的话也可以吧
作者回复: 和工业化流水线一个意思。每个goroutine负责一个业务
2021-11-151 - myrfy老师好,我有两个问题 1、关于or done或者fan in模式,我之前在sof上看到过类似的问题,其中的高赞回答是说,启动与ch数量相等的goroutine,每个goroutine监听一个ch并把读到的结果放入一个收集ch的模式效率要比反射高,并且给出了测评数据,现在手机码字,不太好找到。但想和老师确认一下是不是后面go某个版本对反射做了优化呢? 2、fanout模式里提到可以同步或者异步启动任务。在老师给出的示例代码中,异步启动的优势是什么呢?我猜老师想表达的是不是启动任务前可能还有一些耗时的准备操作?如果是这样的话,建议增加一个注释,否则感觉启动一个goroutine只是为了写一个ch,好像异步效率会更低
作者回复: 我并没有benchmark结果列在这里,凭经验我们也知道反射的效率很低。Francesc Campoy有一篇文章专门做了测试,你可以搜一下。 analyzing the performance of go functions with benchmarks. 异步的方式并不是你所说的目的,而是避免一个out chan阻塞的时候影响其他out
2020-11-111 - Tokamak晁老师,这是我用多个goroutine 实现的 OrDone,但是有一个问题是在 执行close(orDone)是可能会引发panic,引发panic的原因是因为可能多次关闭一个chan, 请问我这个需要如何改才能不引发panic呢? func or3(channels ...<-chan interface{}) <-chan interface{} { if len(channels) == 0 { return nil } orDone := make(chan interface{}) for _, c := range channels { go func(ch <-chan interface{}) { <-ch close(orDone) // 可能会引发panic }(c) } return orDone }
作者回复: 有多种方法,比如使用sync.Once,或者放在一个函数中加上recover机制
2022-11-19归属地:北京 - A9一个基于 TCP 网络的分布式的 Channel ,请问这个有git仓库吗,想学习下
作者回复: https://github.com/chrislusf/netchan
2022-09-13归属地:北京 - Geek_a6104e请问一下最后一段代码里面case <-done: 有什么用呢?
作者回复: 可以用来主动退出
2022-07-31归属地:北京 - པག་ཏོན་།老师您好,在看Marcio Castilho 在 使用 Go 每分钟处理百万请求的例子的时候我很困惑,我认为双层管道是没有意义的,生产者直接发送job给消费者,额定数量的消费者直接进行并发接收并处理就可以达到同样控制并发的效果。为什么非要消费者把一个管道交给生产者,生产者在把job通过管道传递给消费者。我想请问一下这个步骤的作用是什么?
作者回复: 我也觉得他的实现方案有问题,有点感觉是从其它语言的实现复制过来的,有点设计过度。 我和你的设计方案也有些不同,你的方案类似scala的actor模式。这种模式有个问题,如果某个actor处理慢了,可能会导致任务堆积在这个actor上。 go很常规的操作就是用一个channel实现buffer,各个goroutine都从这个buffer获取任务就好了。
2021-10-10 - davix請問老師,channel這些模式都適合哪些塲景使用,能哪些優缺點能講講嗎
作者回复: 以后有机会扩充一下
2021-10-06 - 科科老师,请问下为什么在createCase函数里面,我们在创建一个SelectCase变量的时候,要使用reflect.ValueOf重新初始化一个的channel?
作者回复: 没有重新,每一个建一个
2021-05-20