Go 并发编程实战课
晁岳攀(鸟窝)
前微博技术专家,知名微服务框架 rpcx 作者
5432 人已学习
立即订阅
登录后,你可以任选4讲全文学习
推荐试读
换一换
01 | Mutex:如何解决资源并发访问问题?
02 | Mutex:庖丁解牛看实现
05| RWMutex:读写锁的实现原理及避坑指南
课程目录
已完结/共 22 讲
开篇词 (1讲)
开篇词 | 想吃透Go并发编程,你得这样学!
基本并发原语 (11讲)
01 | Mutex:如何解决资源并发访问问题?
02 | Mutex:庖丁解牛看实现
03|Mutex:4种易错场景大盘点
04| Mutex:骇客编程,如何拓展额外功能?
05| RWMutex:读写锁的实现原理及避坑指南
06 | WaitGroup:协同等待,任务编排利器
07 | Cond:条件变量的实现机制及避坑指南
08 | Once:一个简约而不简单的并发原语
09 | map:如何实现线程安全的map类型?
10 | Pool:性能提升大杀器
11 | Context:信息穿透上下文
原子操作 (1讲)
12 | atomic:要保证原子操作,一定要使用这几种方法
Channel (3讲)
13 | Channel:另辟蹊径,解决并发问题
14 | Channel:透过代码看典型的应用模式
15 | 内存模型:Go如何保证并发读写的顺序?
扩展并发原语 (3讲)
16 | Semaphore:一篇文章搞懂信号量
17 | SingleFlight 和 CyclicBarrier:请求合并和循环栅栏该怎么用?
18 | 分组操作:处理一组子任务,该用什么并发原语?
分布式并发原语 (2讲)
19 | 在分布式环境中,Leader选举、互斥锁和读写锁该如何实现?
20 | 在分布式环境中,队列、栅栏和STM该如何实现?
结束语 (1讲)
结束语 | 再聊Go并发编程的价值和精进之路
Go 并发编程实战课
15
15
1.0x
00:00/00:00
登录|注册

07 | Cond:条件变量的实现机制及避坑指南

你好,我是鸟窝。
在写 Go 程序之前,我曾经写了 10 多年的 Java 程序,也面试过不少 Java 程序员。在 Java 面试中,经常被问到的一个知识点就是等待 / 通知(wait/notify)机制。面试官经常会这样考察候选人:请实现一个限定容量的队列(queue),当队列满或者空的时候,利用等待 / 通知机制实现阻塞或者唤醒。
在 Go 中,也可以实现一个类似的限定容量的队列,而且实现起来也比较简单,只要用条件变量(Cond)并发原语就可以。Cond 并发原语相对来说不是那么常用,但是在特定的场景使用会事半功倍,比如你需要在唤醒一个或者所有的等待者做一些检查操作的时候。
那么今天这一讲,我们就学习下 Cond 这个并发原语。

Go 标准库的 Cond

Go 标准库提供 Cond 原语的目的是,为等待 / 通知场景下的并发问题提供支持。Cond 通常应用于等待某个条件的一组 goroutine,等条件变为 true 的时候,其中一个 goroutine 或者所有的 goroutine 都会被唤醒执行。
顾名思义,Cond 是和某个条件相关,这个条件需要一组 goroutine 协作共同完成,在条件还没有满足的时候,所有等待这个条件的 goroutine 都会被阻塞住,只有这一组 goroutine 通过协作达到了这个条件,等待的 goroutine 才可能继续进行下去。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/1000字
划线
笔记
复制
01 | Mutex:如何解决资源并发访问问题?
02 | Mutex:庖丁解牛看实现
05| RWMutex:读写锁的实现原理及避坑指南
13 | Channel:另辟蹊径,解决并发问题
14 | Channel:透过代码看典型的应用模式
结束语 | 再聊Go并发编程的价值和精进之路
该试读文章来自付费专栏《Go 并发编程实战课》,如需阅读全部文章,
请购买文章所属专栏
立即购买
登录 后留言

精选留言(29)

  • Alexdown
    思考题:
    1. 唤醒的方式有broadcast,第N个waiter被唤醒后需要检查等待条件,因为不知道前N-1个被唤醒的waiter所作的修改是否使等待条件再次成立。
    2. 以下是我实现的一个,有限容量Queue,欢迎讨论!
    https://play.studygolang.com/p/11K2iPVYErn

    package main

    import (
    "fmt"
    "math/rand"
    "strings"
    "sync"
    )

    type Queue struct {
    cond *sync.Cond
    data []interface{}
    capc int
    logs []string
    }

    func NewQueue(capacity int) *Queue {
    return &Queue{cond: &sync.Cond{L: &sync.Mutex{}}, data: make([]interface{}, 0), capc: capacity, logs: make([]string, 0)}
    }

    func (q *Queue) Enqueue(d interface{}) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()

    for len(q.data) == q.capc {
    q.cond.Wait()
    }
    // FIFO入队
    q.data = append(q.data, d)
    // 记录操作日志
    q.logs = append(q.logs, fmt.Sprintf("En %v\n", d))
    // 通知其他waiter进行Dequeue或Enqueue操作
    q.cond.Broadcast()

    }

    func (q *Queue) Dequeue() (d interface{}) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()

    for len(q.data) == 0 {
    q.cond.Wait()
    }
    // FIFO出队
    d = q.data[0]
    q.data = q.data[1:]
    // 记录操作日志
    q.logs = append(q.logs, fmt.Sprintf("De %v\n", d))
    // 通知其他waiter进行Dequeue或Enqueue操作
    q.cond.Broadcast()
    return
    }

    func (q *Queue) Len() int {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    return len(q.data)
    }

    func (q *Queue) String() string {
    var b strings.Builder
    for _, log := range q.logs {
    //fmt.Fprint(&b, log)
    b.WriteString(log)
    }
    return b.String()
    }
    2020-10-27
    3
    23
  • 那时刻
    老师,上节课你提到noCopy,是一个辅助的、用来帮助 vet 检查用的类型,而Cond还有个copyChecker 是一个辅助结构,可以在运行时检查 Cond 是否被复制使用。

    nocpoy是静态检查,copyChecker是运行时检查,不是理解是否正确?

    另外不是是否有其他区别呢?

    作者回复: 是的

    2020-10-26
    9
  • SuperDai
    上一个自己的实现

    package caplimitqueue

    import (
    "sync"

    "github.com/gammazero/deque"
    )

    // CapLimitQueue 限容队列
    type CapLimitQueue struct {
    cond *sync.Cond
    q deque.Deque
    cap int
    }

    // NewCapLimitQueue 返回CapLimitQueue实例.
    func NewCapLimitQueue(cap int) *CapLimitQueue {
    if cap == 0 {
    cap = 64
    }
    q := &CapLimitQueue{
    cap: cap,
    }
    q.cond = sync.NewCond(&sync.Mutex{})
    return q
    }

    // Push 往限容队列添加数据对象.
    func (q *CapLimitQueue) Push(elem interface{}) {
    q.cond.L.Lock()
    for q.q.Len() >= q.cap {
    // (1) 队列已满, 等待消费goroutine取出数据对象.
    q.cond.Wait()
    }
    defer q.cond.L.Unlock()

    q.q.PushBack(elem)
    // (2) 通知消费goroutine已有数据对象进队列 -> (3)
    q.cond.Broadcast()
    }

    // Pop 从限容队列取出数据对象.
    func (q *CapLimitQueue) Pop(want int) []interface{} {
    q.cond.L.Lock()
    for q.q.Len() == 0 {
    // (3) 队列为空, 等待生产goroutine添加数据对象.
    q.cond.Wait()
    }
    defer q.cond.L.Unlock()

    if want > q.q.Len() {
    want = q.q.Len()
    }
    output := make([]interface{}, want)
    for i := 0; i < want; i++ {
    output[i] = q.q.PopFront()
    }
    // (4) 通知生产goroutine已有数据对象出队列 -> (1)
    q.cond.Broadcast()

    return output
    }

    // Len 返回限容队列当前长度.
    func (q *CapLimitQueue) Len() int {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    return q.q.Len()
    }
    2020-11-03
    4
  • Junes
    1. 是否需要等待,是看业务实现的需求吧:
    每个caller会唤醒一个或者所有的waiter
    caller和waiter的数量对应是不确定的,如N:M
    waiter唤醒后的处理逻辑是自己决定的,比如示例中的ready和队列长度

    2. 有长度限制的队列,代码可以参考示例中的PriorityQueue
    Push入队Broadcast,Pop出队Wait
    队列长度有限制的话,在Dueue中维护一个变量size,当前队列长度大于等于size时,Push操作直接返回错误
    改造:如果希望Dueue满时Push操作阻塞,可以在Push用个Wait来阻塞,收到Broadcast后,检测到当前队列小于size就Push
    附上一块伪代码

    // 从队列中取出一个元素
    func (p *Dueue) Pop() (interface{}, error) {
    p.lock.Lock()
    defer p.lock.Unlock()

    // 如果队列为空,等待,直到被唤醒
    for p.queue.Len() == 0 {
    p.cond.Wait()
    }
    return p.queue.Pop()
    }

    // 增加元素到队列中(非阻塞方式)
    func (p *Dueue) Push(e interface{}) error {
    p.lock.Lock()
    defer p.lock.Unlock()

    // 如果队列满了,直接返回error(阻塞改造:在Pop中添加个Broadcast方法,这里改造成for循环进行wait)
    if p.queue.Len() >= p.maxSize {
    return fmt.Errorf("over size")
    }

    // 把元素加入到队列后,通知所有的waiter
    p.queue.Push(e)
    p.cond.Broadcast()
    return nil
    }
    2020-10-26
    1
    3
  • myrfy
    还是没有想明白k8s为什么不能用channel通知
    close可以实现broadcast的功效,在pop的时候,也是只有一个goroutine可以拿到数据,感觉除了关闭队列之外,不存在需要broadcast的情况。也就是感觉不需要多次broadcast,这样channel应该是满足要求的……请老师明示

    作者回复: 因为需要重用,使用chan close后没法重用了

    2020-10-28
    2
  • Hurt
    打卡
    2020-11-20
    1
  • 虢國技醬
    第一个思考题其实不难,其他goroutine只是ready++后唤醒,如果等待的主goroutine不检查条件变量,主goroutine在第一次唤醒时就继续执行了!也就体现不出条件变量的“条件”了
    2020-11-08
    2
    1
  • 罗杰
    文章虽然看明白了,但是要完成思考题还是有一定的难度的。
    2020-10-28
    1
    1
  • moooofly
    请教一个问题,nocpoy 是用于 vet 静态检查,copyChecker 是为了运行时检查,都是为了检查 copy 问题,为啥 Cond 要在两处检查,而 Mutex 只需要一处?

    作者回复: 你第一句给出了答案。分别应用不同阶段

    2020-10-27
    2
    1
  • 约书亚
    第一题的答案应该应该还包括spurious wakeup的因素
    2020-10-27
    1
  • chongsheng
    关于Cond还有一种会不小心误用的场景,因为一些原因导致Wait执行的时候,Signal/Broadcast就已经执行完了,导致Wait一直等待无法唤醒。比如这里的例子
    https://stackoverflow.com/questions/36857167/how-to-correctly-use-sync-cond

    作者回复: 👍🏻

    2022-01-06
  • chongsheng
    关于Cond还有一种不小心误用的场景,就是在Wait()调用之前,Signal/Broadcast就执行完了,导致一直Wait()。比如这里的例子
    2022-01-06
  • 庖丁解牛
    文章中在描述 Cond 的复杂性时,说明了 3 点,第三点:「条件变量的更改」 是否可需改为:「等待条件的更改」?

    作者回复: 对

    2021-11-03
  • 徐改
    关于思考题2:
    我使用了环形队列,用来避免线性队列队满情况下因数据搬移而带来的时间开销:
    / 环形队列,实现阻塞、通知机制
    func (queue *PriorityQueue) Add(v interface{}) {
    queue.cond.L.Lock()
    defer queue.cond.L.Unlock()
    // 判断队满
    if (queue.tail + 1) % cap(queue.item) == queue.head {
    fmt.Println("生产者被阻塞")
    queue.cond.Wait()
    fmt.Println("生产者被唤醒")
    }
    fmt.Println("生产者生产数据")
    queue.item[queue.tail] = v
    queue.tail = (queue.tail + 1) % cap(queue.item)
    queue.cond.Broadcast()
    }

    func (queue *PriorityQueue) Pop() interface{} {
    queue.cond.L.Lock()
    defer queue.cond.L.Unlock()
    // 判断队空
    if queue.tail == queue.head {
    fmt.Println("消费者被阻塞")
    queue.cond.Wait()
    fmt.Println("消费者被唤醒")
    }
    fmt.Println("消费者消费数据")
    v := queue.item[queue.head]
    queue.head = (queue.head + 1) % cap(queue.item)
    queue.cond.Broadcast()
    return v
    }
    2021-10-20
  • Allen
    package main

    import (
    "fmt"
    "sync"
    "time"

    "go.uber.org/atomic"
    )

    type CondQue struct {
    c *sync.Cond
    size int
    list []int
    }

    func NewCondQue(size int) *CondQue {
    return &CondQue{size: size, c: sync.NewCond(&sync.Mutex{})}
    }

    func (q *CondQue) Add(item int) {
    q.c.L.Lock()
    defer q.c.L.Unlock()
    for len(q.list) >= q.size { // 当前队列长度已满
    q.c.Wait() // 等待
    fmt.Printf("wait for add:%v\n", item)
    }
    q.list = append(q.list, item)
    q.c.Broadcast()
    fmt.Printf("add:%v cur len:%v\n", item, len(q.list))
    }

    func (q *CondQue) Pop() int {
    q.c.L.Lock()
    defer q.c.L.Unlock()
    for len(q.list) <= 0 { // 当前队列长度已空
    q.c.Wait() // 等待
    fmt.Printf("wait for pop\n")
    }

    item := q.list[0]
    q.list = q.list[1:]
    fmt.Printf("pop:%v cur len:%v\n", item, len(q.list))
    return item
    }

    func main() {
    cond := NewCondQue(2)
    var g atomic.Int32
    for i := 0; i < 3; i++ {
    go func() {
    for {
    g.Add(1)
    cond.Add(int(g.Load()))
    time.Sleep(time.Second * 1)
    }
    }()
    }
    for i := 0; i < 2; i++ {
    go func() {
    for {
    _ = cond.Pop()
    time.Sleep(time.Millisecond * 500)
    }
    }()
    }
    time.Sleep(time.Second * 10)
    }
    2021-09-24
  • bearlu
    老师请教一个问题,如果Wait前加锁,然后执行完Wait又Unlock有什么作用,我把Wait后面的Unlock去掉,好似程序也能正常运行。是我漏了什么?

    作者回复: 那你locker就永远没释放呗

    2021-07-24
  • 吴小智
    想问老师,为什么 ready 在 Wait 之后处理,会有死锁的时候发生?
    c.L.Lock()
    for ready != 10 {
    c.Wait()
    ready ++
    fmt.Println("裁判员被唤醒",ready)
    }
    c.L.Unlock()
    2021-07-08
  • Andylee
    如果在执行 runtime_notifyListWait(&c.notify, t)之前有其他协程broadcast了,会不会永远不会醒来了?
    2021-05-20
  • 臭猫
    k8s里,p.lock.Lock() 这个在wait之前的锁,并不是p.cock.L.Lock(),这样不会有问题?
    2020-12-11
  • Eirture
    简单的实现:

    https://play.golang.org/p/GqwFSz_F7nL
    2020-11-30
收起评论
29
返回
顶部