Go 并发编程实战课
晁岳攀(鸟窝)
前微博技术专家,知名微服务框架 rpcx 作者
立即订阅
3858 人已学习
课程目录
已完结 22 讲
0/4登录后,你可以任选4讲全文学习。
开篇词 (1讲)
开篇词 | 想吃透Go并发编程,你得这样学!
免费
基本并发原语 (11讲)
01 | Mutex:如何解决资源并发访问问题?
02 | Mutex:庖丁解牛看实现
03|Mutex:4种易错场景大盘点
04| Mutex:骇客编程,如何拓展额外功能?
05| RWMutex:读写锁的实现原理及避坑指南
06 | WaitGroup:协同等待,任务编排利器
07 | Cond:条件变量的实现机制及避坑指南
08 | Once:一个简约而不简单的并发原语
09 | map:如何实现线程安全的map类型?
10 | Pool:性能提升大杀器
11 | Context:信息穿透上下文
原子操作 (1讲)
12 | atomic:要保证原子操作,一定要使用这几种方法
Channel (3讲)
13 | Channel:另辟蹊径,解决并发问题
14 | Channel:透过代码看典型的应用模式
15 | 内存模型:Go如何保证并发读写的顺序?
扩展并发原语 (3讲)
16 | Semaphore:一篇文章搞懂信号量
17 | SingleFlight 和 CyclicBarrier:请求合并和循环栅栏该怎么用?
18 | 分组操作:处理一组子任务,该用什么并发原语?
分布式并发原语 (2讲)
19 | 在分布式环境中,Leader选举、互斥锁和读写锁该如何实现?
20 | 在分布式环境中,队列、栅栏和STM该如何实现?
结束语 (1讲)
结束语 | 再聊Go并发编程的价值和精进之路
Go 并发编程实战课
15
15
1.0x
00:00/00:00
登录|注册

13 | Channel:另辟蹊径,解决并发问题

晁岳攀 2020-11-09
你好,我是鸟窝。
Channel 是 Go 语言内建的 first-class 类型,也是 Go 语言与众不同的特性之一。Go 语言的 Channel 设计精巧简单,以至于也有人用其它语言编写了类似 Go 风格的 Channel 库,比如docker/libchantylertreat/chan,但是并不像 Go 语言一样把 Channel 内置到了语言规范中。从这一点,你也可以看出来,Channel 的地位在编程语言中的地位之高,比较罕见。
所以,这节课,我们就来学习下 Channel。

Channel 的发展

要想了解 Channel 这种 Go 编程语言中的特有的数据结构,我们要追溯到 CSP 模型,学习一下它的历史,以及它对 Go 创始人设计 Channel 类型的影响。
CSP 是 Communicating Sequential Process 的简称,中文直译为通信顺序进程,或者叫做交换信息的循序进程,是用来描述并发系统中进行交互的一种模式。
CSP 最早出现于计算机科学家 Tony Hoare 在 1978 年发表的论文中(你可能不熟悉 Tony Hoare 这个名字,但是你一定很熟悉排序算法中的 Quicksort 算法,他就是 Quicksort 算法的作者,图灵奖的获得者)。最初,论文中提出的 CSP 版本在本质上不是一种进程演算,而是一种并发编程语言,但之后又经过了一系列的改进,最终发展并精炼出 CSP 的理论。CSP 允许使用进程组件来描述系统,它们独立运行,并且只通过消息传递的方式通信。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《Go 并发编程实战课》,如需阅读全部文章,
请订阅文章所属专栏
立即订阅
登录 后留言

精选留言(28)

  • 坚白同异
    思考题
    1.
    func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    ch3 := make(chan int)
    ch4 := make(chan int)
    go func() {
    for {
    fmt.Println("I'm goroutine 1")
    time.Sleep(1 * time.Second)
    ch2 <-1 //I'm done, you turn
    <-ch1
    }
    }()

    go func() {
    for {
    <-ch2
    fmt.Println("I'm goroutine 2")
    time.Sleep(1 * time.Second)
    ch3 <-1
    }

    }()

    go func() {
    for {
    <-ch3
    fmt.Println("I'm goroutine 3")
    time.Sleep(1 * time.Second)
    ch4 <-1
    }

    }()

    go func() {
    for {
    <-ch4
    fmt.Println("I'm goroutine 4")
    time.Sleep(1 * time.Second)
    ch1 <-1
    }

    }()



    select {}
    }
    2.双向通道可以赋值给单向,反过来不可以.
    2020-11-09
    1
    7
  • Junes
    第一个问题实现的方法有很多,最常规的是用4个channel,我这边分享一个用单channel实现的思路:
    因为channel的等待队列是先入先出的,所以我这边取巧地在goroutine前加一个等待时间,保证1~4的goroutine,他们在同个chan阻塞时是有序的

    func main() {
    ch := make(chan struct{})
    for i := 1; i <= 4; i++ {
    go func(index int) {
    time.Sleep(time.Duration(index*10) * time.Millisecond)
    for {
    <-ch
    fmt.Printf("I am No %d Goroutine\n", index)
    time.Sleep(time.Second)
    ch <- struct{}{}
    }
    }(i)
    }
    ch <- struct{}{}
    time.Sleep(time.Minute)
    }
    2020-11-12
    3
    5
  • fhs
    func f(i int, input <-chan int, output chan<- int) {
    for {
    <-input
    fmt.Println(i)
    time.Sleep(time.Second)
    output <- 1
    }
    }
    func TestChannelPlan(t *testing.T) {
    c := [4]chan int{}
    for i := range []int{1, 2, 3, 4} {
    c[i] = make(chan int)
    }
    go f(1, c[3], c[0])
    go f(2, c[0], c[1])
    go f(3, c[1], c[2])
    go f(4, c[2], c[3])
    c[3] <- 1
    select {}
    }
    2020-11-11
    3
  • 罗帮奎
    之前使用go-micro时候就遇到过,unbufferd chan导致的goroutine泄露的bug,当时情况是并发压力大导致rpc调用超时,超时退出当前函数导致了goroutine泄露,go-micro有一段类似的使用unbuffered chan的代码,后来改成了buffer=1
    2020-11-15
    2
  • 虫子樱桃
    /*
     * Permission is hereby granted, free of charge, to any person obtaining a copy
     * of this software and associated documentation files (the "Software"), to deal
     * in the Software without restriction, including without limitation the rights
     * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
     * copies of the Software, and to permit persons to whom the Software is
     * furnished to do so, subject to the following conditions:
     * The above copyright notice and this permission notice shall be included in
     * all copies or substantial portions of the Software.
     * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     * FITNESS FOR A PARTICULAR PURPOSE AND NONINFINGEMENT. IN NO EVENT SHALL THEq
     * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     * THE SOFTWARE.
     */

    package main

    import (
    "fmt"
    "time"
    )

    type NumberChan struct {
    Ch chan int
    ChannelNumber int
    }

    func (nch *NumberChan) SendNotify() {
    go func() {
    nch.Ch <- nch.ChannelNumber
    }()
    }

    func (nch *NumberChan) PrintInfo() {
    fmt.Println(nch.ChannelNumber)
    time.Sleep(time.Second)
    }

    func NewNumberChan(seq int) *NumberChan {
    nch := NumberChan{
    Ch: make(chan int),
    ChannelNumber: seq,
    }
    return &nch
    }

    func main() {
    var (
    nch1 = NewNumberChan(1)
    nch2 = NewNumberChan(2)
    nch3 = NewNumberChan(3)
    nch4 = NewNumberChan(4)
    )
    go func() {
    nch1.SendNotify()
    }()
    for {
    select {
    case <-nch1.Ch:
    nch1.PrintInfo()
    nch2.SendNotify()
    case <-nch2.Ch:
    nch2.PrintInfo()
    nch3.SendNotify()
    case <-nch3.Ch:
    nch3.PrintInfo()
    nch4.SendNotify()
    case <-nch4.Ch:
    nch4.PrintInfo()
    nch1.SendNotify()
    }
    }

    }
    2020-11-12
    1
  • Stony.修行僧
    一个 goroutine 可以把数据的“所有权”交给另外一个 goroutine(虽然 Go 中没有“所有权”的概念,但是从逻辑上说,你可以把它理解为是所有权的转移)
    这是要推广 Rust啊
    2020-11-11
    1
  • QSkerry
    一般来说,单向通道有什么用呢?
    2020-11-09
    2
    1
  • 蜗牛🐌
    channl1 := make(chan int, 0)
    go func() {
    c := time.NewTicker(1 * time.Second)
    i := 0
    for {
    select {
    case <-c.C:
    i++
    channl1 <- i
    if i == 4 {
    i = 0
    }
    }
    }
    }()
    for {
    select {
    case i := <-channl1:
    fmt.Println(i)
    }
    }
    2021-01-29
  • ( ・᷄ὢ・᷅ )
    1.func Print(recv chan int) {
    for {
    index, ok := <-recv
    if !ok {
    return
    }
    fmt.Println(index)
    index++
    if index == 5 {
    index = 1
    }

    time.Sleep(1 * time.Second)
    recv <- index
    }
    }

    func main() {
    c := make(chan int)
    go Print(c)
    go Print(c)
    go Print(c)
    go Print(c)
    c <- 1
    time.Sleep(20 * time.Second)
    close(c)
    }
    2.不可以
    2021-01-24
  • Assassin
    send 第6部分没有代码吗?
    还有 buf 是循环队列空闲空间的指针吗?第6部分判断 buf 满和第2部分判断队列满什么区别啊?
    2020-12-21
  • Atong

    type NumChan struct {
    jobs []*Job
    }

    func (n *NumChan) JobNum(m int) {
    for i := 1; i <= m; i++ {
    job := &Job{
    ID: i,
    Jobc: make(chan int, 1),
    }
    go job.run()
    n.jobs = append(n.jobs, job)
    }
    n.run()
    }
    func (n *NumChan) run() {
    for {
    n.seq()
    }
    }
    func (n *NumChan) seq() {
    for _, j := range n.jobs {
    j.Jobc <- 1
    time.Sleep(time.Second * 1)
    }
    }

    type Job struct {
    ID int
    Jobc chan int
    }

    func (j *Job) run() {
    for {
    select {
    case <-j.Jobc:
    fmt.Printf("id %d\n", j.ID)
    }
    }
    }

    func main() {
    n := &NumChan{}
    n.JobNum(4)
    }
    2020-12-21
  • 星星之火
    channel 中包含的 mutex 是什么呢,和课程最开始的 sync.mutex 是同一个东西吗?
    因为 sync.mutex 是依赖 channel 实现的,感觉应该不是同一个 mutex?

    作者回复: 不是同一个,只是类似。channel中这个是运行时内部使用的mutex

    2020-12-05
  • ldeng 7
    对于 select 相关实现的源码个人认为还应该讲一下
    2020-12-02
  • 思维
    延伸阅读:https://github.com/developer-learning/reading-go/issues/450#issuecomment-524663059
    这里对channel的分析更详细些
    2020-11-27
  • 孟凡浩
    第一题:
    func TestFourGo(t *testing.T) {
    count := 6
    ch := make([]chan bool, 0)
    for i := 0; i < count; i++ {
    ch = append(ch, make(chan bool))
    }

    gor := func(ch1 chan bool, ch2 chan bool, num int8) {
    for {
    <-ch1
    time.Sleep(time.Second)
    fmt.Println(num)
    ch2 <- true
    }
    }
    for i := 0; i < count; i++ {
    n := i + 1
    if i == count-1 {
    n = 0
    }
    go gor(ch[i], ch[n], int8(i+1))
    }
    ch[0] <- true

    time.Sleep(time.Hour)
    }
    2020-11-24
  • JYZ1024
    /******* 四个协程,循环输出1,2,3,4 ***********/
    /*有四个 goroutine,编号为 1、2、3、4。每秒钟会有一个 goroutine 打印出它自己的编号,
    要求你编写一个程序,让输出的编号总是按照 1、2、3、4、1、2、3、4、……的顺序打印出来。
    */
    type Tasker struct {
    Id int
    SignalCh chan struct{}
    BroadcastChan chan struct{}
    }

    func NewTask(id int, ch chan struct{},bch chan struct{}) *Tasker {
    return &Tasker{
    Id: id,
    SignalCh: ch,
    BroadcastChan: bch,
    }
    }

    func (t *Tasker) say() {
    for {
    <-t.SignalCh
    fmt.Println(t.Id)
    time.Sleep(time.Second*1)
    t.BroadcastChan <- struct{}{}
    }
    }

    func ChannelArrange(rNum int) {
    chanArray := make([]chan struct{},0,rNum)
    for i:=0;i<rNum;i++ {
    chanArray = append(chanArray,make(chan struct{}))
    }
    worker := make([]*Tasker,0,rNum)
    for i:=0;i<rNum;i++ {
    worker = append(worker,NewTask(i+1,chanArray[(i-1+rNum)%rNum],chanArray[(i + rNum)%rNum]))
    }

    for i:=0;i<rNum;i++ {
    curIndex := i
    go func() {
    worker[curIndex].say()
    }()
    }
    chanArray[rNum-1] <- struct{}{}

    select {

    }
    }
    2020-11-23
  • 石头娃
    思考题:

    func main() {
    var a = make(chan int, 1)
    var b = make(chan int, 1)
    var c = make(chan int, 1)
    var d = make(chan int, 1)
    var e = make(chan string)
    go func() {
    for {
    flag := <-d
    log.Println(1)
    a <- flag
    }
    }()
    go func() {
    for {
    flag := <-a
    log.Println(2)
    b <- flag
    }
    }()
    go func() {
    for {
    flag := <-b
    log.Println(3)
    c <- flag
    }
    }()
    go func() {
    for {
    flag := <-c
    log.Println(4)
    time.Sleep(time.Second)
    d <- flag
    }
    }()
    d <- 1
    <-e
    }

    作者回复: 逻辑没问题,符合答案。如果代码可以抽象更好,减少重复代码

    2020-11-19
  • 王德彪
    [close通过 close 函数,
    可以把 chan 关闭,编译器会替换成 closechan 方法的调用。下面的代码是 close chan 的主要逻辑。如果 chan 为 nil,close 会 panic;如果 chan 已经 closed,再次 close 也会 panic。
    否则的话,如果 chan 不为 nil,chan 也没有 closed,就把等待队列中的 sender(writer)和 receiver(reader)从队列中全部移除并唤醒。]
    疑问:老师你好,全部移除能明白,为什么要唤醒的?

    作者回复: 因为队列中的这些reader/sender都被阻塞住了,close chan唤醒它们,让它们继续工作,否则就永远阻塞了

    2020-11-15
  • 朱伟
    我的场景是一个生产者消费者模型,生产者和消费者是并发执行,生产者把数据生产完之后会关闭channel,消费者改如何退出
    for {
    qv, ok := <-qvCh
    if !ok {
    //消费者退出

    }
    if qv == nil || len(qv) == 0 {
    continue
    } else {
    //消费者业务逻辑
    }
    }
    我发现有时候生产者还没生产数据消费者就退出结束了,这个写法有问题吗
    2020-11-14
    1
  • Panmax
    recv 的第四部分的描述是不是不太对,这里并没有检查 buf,而是直接检查 sender队列,优先把sender队列中的数据给出去。

    原文中写的是「第四部分是处理 sendq 队列中有等待者的情况。这个时候,如果 buf 中有数据,优先从 buf 中读取数据,否则直接从等待队列中弹出一个 sender,把它的数据复制给这个 receiver。」

    作者回复: 是的,描述错误,已通知编辑修改,谢谢

    2020-11-14
收起评论
28
返回
顶部