35|即学即练:如何实现一个轻量级线程池?
该思维导图由 AI 生成,仅供参考
为什么要用到 Goroutine 池?
- 深入了解
- 翻译
- 解释
- 总结
本文介绍了如何在Go语言中实现一个轻量级线程池的原理和实现方式。首先解释了为什么需要使用Goroutine池,指出了Goroutine的开销虽小,但规模化后也会成为瓶颈,因此需要一种解决方案来提高计算资源的利用率。文章详细介绍了workerpool的实现原理,包括pool的创建与销毁、pool中worker的管理以及task的提交与调度。通过使用channel+select的实现方案,展示了如何利用Go的并发模型和并发原语间的协作来实现轻量级线程池。文章还介绍了功能选项机制的实现方式,让某个包的用户可以根据自己的需求,通过设置不同功能选项来定制包的行为。通过理论分析和实际代码演示,帮助读者了解了如何利用Go语言实现一个高效的轻量级线程池,为读者提供了宝贵的技术指导和实践经验。文章内容详实,逻辑清晰,对于想要深入了解轻量级线程池实现原理的读者来说,是一篇非常有价值的技术文章。
《Tony Bai · Go 语言第一课》,新⼈⾸单¥59
全部留言(31)
- 最新
- 精选
- ivhong非常感谢老师带着做了一次这样的实现,因为我自己也尝试过这种实现(纯粹是为了学习用)。有几个问题我不是特别明白,不知道是不是和老师理解的一样,望老师闲暇之余给予指正,谢谢! 1. 这个是不是叫“协程池”,为什么叫做“线程池”?两者有什么区别呢?或者是到底什么是“协程”呢? 2. 是不是这节课的实现,也纯粹是为了学习而实现的,个人理解,go实现Goroutine,就是为了解决“线程池”的繁琐,让“并发”实现的不用那么的麻烦,如果是超小“任务”,不用考虑线程频繁切换导致系统资源的浪费。如果再实现“协程池”的话,是不是丢失了这种优点? 3. 常驻内存的Goroutine,反复使用,会导致这个Goroutine的内存越来越大,或者其他隐藏的风险么?
作者回复: 好问题。 1. 传统理解的coroutine一般是面向协作式,而非抢占式。像python中通过yield关键字创建的协程,与主routine之间是在一个线程上实现的切换执行,从设计角度是通过coroutine实现了并发(concurrency),但其实它们还是串行执行的,不会真正并行(paralellism),即便在多核处理器上。 基于上面的理解,我们就可以意识到goroutine并非传统意义上的coroutine,是支持抢占的,而且也必须依赖抢占实现runtime对goroutine的调度。它更像thread,可以绑定不同的cpu核并行执行(如果是在多核处理器上的话)。同时基于goroutine的设计也会一种并发的设计。 而goroutine与thread又不同,goroutine是在用户层(相较于os的内核层)调度的,os并不知道其存在,goroutine的切换相对轻量。而thread是os 来调度的,切换代价更高一些。 所以文中将goroutine称为“轻量级线程”,而不是协程。 2. 你理解的没错。这节课是为了演示goroutine、channel之间的调度与通信机制而“设计”出来的。goroutine使用代价很低,通常不用考虑池化。但是在一些大型网络服务程序时,一旦goroutine数量过多,内存占用以及调度goroutine的代价就不能不考虑了。于是有了“池化”的思路。这与传统的线程池的思路的确是一脉相承的 3. go是gc的,内存不会越来越大。
2022-03-1711 - $侯老师您好请教几个问题: 第一个问题,demo1中没有看到p.Free的代码示例,Free方法只是向p.quit <- struct{}{}发送一个空结构体就可以吗,请教下Free方式该如何写 第二个问题,demo1中好像也没看看到p.wg.Wait()
作者回复: 原文中有源码的链接,在最后。 源码在 https://github.com/bigwhite/publication/tree/master/column/timegeek/go-first-course/35 看了后,就可以回答你的问题了。
2022-01-1773 - Darren老师以下几个问题哈: 1、第一种实现中,这块是不是有点问题: go func() { defer func() { if err := recover(); err != nil { fmt.Printf("worker[%03d]: recover panic[%s] and exit\n", i, err) <-p.active } p.wg.Done() }() <-p.active是不是应该要放到if的外面,如果task执行本身没有出错,正常结束了,active没有减少的地方 2、这块文字描述有点问题,p<-active应该是<-p.active “使用了 defer+recover 对 panic 进行捕捉,捕捉后 worker 也是要退出的,于是我们还通过p<-active更新了 worker 计数器” 3、第二种实现中,当没有提前创建worker,那么当tasks中有任务的时候,p.returnTask方法是干啥的?文章中没有这个方法,且文字也没有说明呀 func (p *Pool) run() { idx := len(p.active) if !p.preAlloc { loop: for t := range p.tasks { p.returnTask(t) select { case <-p.quit: return case p.active <- struct{}{}: idx++ p.newWorker(idx) default: break loop } } }
作者回复: 看的真细致!👍 1. worker一旦创建后,除了panic和quit通知退出,worker是不会退出的,也就是没有所谓“正常退出”的情况。所以没在defer中调用<-p.active。 2. 的确是笔误,感谢指出。 3. 在文后有源码链接。这里的task仅是触发了worker创建,这里是调度循环,不处理task,所以要把task扔回tasks channel,等worker启动后再处理。
2022-01-17123 - Geek_0d5d37老师您好,这个段代码作用我也不太理解 if !p.preAlloc { loop: for t := range p.tasks { p.returnTask(t) select { case <-p.quit: return case p.active <- struct{}{}: idx++ p.newWorker(idx) default: break loop } } } 您在留言中回答 当preAlloc=false时有用 ,如果是这样demo1 就是等于fasle的情况没使用这段代码的 ,请老师有空回答一下
作者回复: 当preAlloc=false时,即不预分配时。这样就根据tasks的情况来创建worker。如果当前没有task,实际上系统中没有worker被创建出来,直到有task才会创建worker。一直到active channel满了!这样pool中所有worker都创建出来后,再跳出循环,进入下面的quit监听。 不过workpool2的这段代码的确有refactor的空间😁。可以写的更好理解一些。
2023-04-06归属地:四川31 - Six Days请教一下,池化的话,当前的demo1场景是不是没有考虑使用同一个worker进行t任务的处理,而是通过不断的创建 Goroutine实现的,通过capacity控制了处理任务Goroutine的数量,通过Go gc 来实现Goroutine的回收,是不是因为Goroutine 的占内存比较小,为此没有做Goroutine 的复用,所以采用不断创建,还是当前为了简单演示呢,实际还是需要复用Goroutine 的呢?
作者回复: demo1创建goroutine后没有回收,一直是复用的,最多创建capacity个goroutine,直到pool销毁。
2023-03-09归属地:广东21 - Six Days请教一下,p.active 的chan 容量指定是capacity,而只有run的时候,才会通过p.active <- struct{}{} 往p.active中丢东西,p.active 才会变多,达到capacity时。任务T则阻塞,我理解 run 只会New的时候触发,请问是否与文中描述一致呢? func (p *Pool) run() { idx := 0 for { select { case <-p.quit: return case p.active <- struct{}{}: // create a new worker idx++ p.newWorker(idx) } } }
作者回复: run在New中调用,active满了,代表capacity个worker goroutine已经创建完毕,后续将重用这些goroutine。这时候run会阻塞在select上直到quit channel有数据才会退出。p.active 那个case后续没用了。
2023-03-09归属地:广东1 - demajiaoif !p.preAlloc { loop: for t := range p.tasks { p.returnTask(t) select { case <-p.quit: return case p.active <- struct{}{}: idx++ p.newWorker(idx) default: break loop } } } 这段代码感觉没用呀。
作者回复: 当preAlloc=false时,即不预分配时,有用。
2023-01-22归属地:浙江1 - 撕影为何关键变化不写出来?太仓促了吧,一篇最后一节以没看懂收场,对学生打击可不小啊老师
作者回复: “关键变化”,指的是?
2023-01-14归属地:湖南1 - Sunrise考虑到 Goroutine 调度的次序的不确定性,这里我在创建 workerpool 与真正开始调用 Schedule 方法之间,做了一个 Sleep,尽量减少 Schedule 都返回失败的频率 这块也不太懂,为啥不加 Sleep 会全返回失败呢?
作者回复: 最后一版Schedule加入了default分支,当pool资源不够又设置为non block时,schedule肯定会返回error啊。
2022-11-24归属地:辽宁1 - Sunrise有几个问题不大理解,望老师抽空解答: 1)自引用函数与选项设计是为了解决 go 函数没有默认参数和可选参数吗? go 函数为什么没有设计默认参数和可选参数呢? 2)为什么下面的 for { select ... } 放到 goroutine 中 才会输出 ch2: 2 ch1: 1 done, 如果直接放到外面只会输出 done? func TestSelect(t *testing.T) { ch1 := make(chan int) ch2 := make(chan int) go func() { ch1 <- 1 }() go func() { ch2 <- 2 }() go func() { for { select { case i := <-ch1: fmt.Println("ch1:", i) case j := <-ch2: fmt.Println("ch2:", j) default: fmt.Println("done") return } } }() // ch2: 2 ch1: 1 done }
作者回复: 问题1:主要是为了可选参数吧。为什么go没有原生支持默认参数与可选参数,我猜是因为go设计者压根就不想引入这个复杂性。 问题2: 和goroutine的调度顺序有关。
2022-11-23归属地:北京1