Go 并发编程实战课
晁岳攀(鸟窝)
前微博技术专家,知名微服务框架 rpcx 作者
25635 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 22 讲
Go 并发编程实战课
15
15
1.0x
00:00/00:00
登录|注册

04| Mutex:骇客编程,如何拓展额外功能?

实现线程安全的队列
获取等待者的数量等指标
TryLock
拓展功能
易错场景
实现原理
基本用法
Mutex
Mutex知识地图

该思维导图由 AI 生成,仅供参考

你好,我是鸟窝。
前面三讲,我们学习了互斥锁 Mutex 的基本用法、实现原理以及易错场景,可以说是涵盖了互斥锁的方方面面。如果你能熟练掌握这些内容,那么,在大多数的开发场景中,你都可以得心应手。
但是,在一些特定的场景中,这些基础功能是不足以应对的。这个时候,我们就需要开发一些扩展功能了。我来举几个例子。
比如说,我们知道,如果互斥锁被某个 goroutine 获取了,而且还没有释放,那么,其他请求这把锁的 goroutine,就会阻塞等待,直到有机会获得这把锁。有时候阻塞并不是一个很好的主意,比如你请求锁更新一个计数器,如果获取不到锁的话没必要等待,大不了这次不更新,我下次更新就好了,如果阻塞的话会导致业务处理能力的下降。
再比如,如果我们要监控锁的竞争情况,一个监控指标就是,等待这把锁的 goroutine 数量。我们可以把这个指标推送到时间序列数据库中,再通过一些监控系统(比如 Grafana)展示出来。要知道,锁是性能下降的“罪魁祸首”之一,所以,有效地降低锁的竞争,就能够很好地提高性能。因此,监控关键互斥锁上等待的 goroutine 的数量,是我们分析锁竞争的激烈程度的一个重要指标
实际上,不论是不希望锁的 goroutine 继续等待,还是想监控锁,我们都可以基于标准库中 Mutex 的实现,通过 Hacker 的方式,为 Mutex 增加一些额外的功能。这节课,我就来教你实现几个扩展功能,包括实现 TryLock,获取等待者的数量等指标,以及实现一个线程安全的队列。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

本文深入介绍了如何通过Hacker的方式为Mutex增加额外功能,包括实现TryLock、获取等待者的数量等指标以及实现一个线程安全的队列。作者首先详细介绍了如何实现TryLock方法,通过对Mutex结构进行扩展,使得在请求锁时,如果锁已被其他goroutine持有或处于唤醒状态,则直接返回false,避免阻塞等待。其次,作者讲解了如何通过unsafe操作获取Mutex结构中的state字段,从而实现获取等待者数量等指标的功能。最后,作者提供了一些查询方法,如IsLocked、IsWoken和IsStarving,用于实时获取锁的状态信息。此外,文章还介绍了如何使用Mutex实现一个线程安全的队列,通过Mutex为非线程安全的数据结构提供线程安全的访问。整篇文章以实际代码为例,详细讲解了如何通过Hacker的方式为Mutex增加额外功能,对于想要深入了解Mutex的读者来说,是一篇非常有价值的文章。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Go 并发编程实战课》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(36)

  • 最新
  • 精选
  • +1day
    老师您好,在获取等待者数量的代码中 如果要加上锁持有者的数量的话,为什么不是 v = v >> mutexWaiterShift + (v & mutexLocked) 而是 v = v >> mutexWaiterShift //得到等待者的数值 v = v + (v & mutexLocked) //再加上锁持有者的数量,0或者1 这样呢?第一步修改了 v 的值,v 的第一位已经不再是记录该锁是否被持有了,那 v&mutexLocked 是不是不对呢?

    作者回复: 你说的对

    2020-10-23
    7
    22
  • Panmax
    如果底层 Mutex 的 state 在某个版本中含义变了,上边写的 TryLock 和监控锁的一些方法就会失效,所以这样做是不是比较危险。

    作者回复: 是的.这只是hack方式,和go的版本有关系。

    2020-10-24
    4
    7
  • 不二
    请教一个基础问题,为啥 (*int32)(unsafe.Pointer(&m.Mutex)) 可以获取sync.Mutex中state的值,Mutex结构中不是还有sema吗?

    作者回复: 只取第一个用

    2020-10-20
    7
    5
  • Gojustforfun
    1)『获取等待者的数量等指标』小节,『第 15 行我们左移三位(这里的常量 mutexWaiterShift 的值为 3)』应该是右移三位。 2)在now~now+timout内,间隔重试调用TryLock

    作者回复: 多谢

    2020-10-20
    2
  • CrazyCodes
    你可以为 Mutex 获取锁时加上 Timeout 机制吗?会有什么问题吗? 如果加上timeout机制,就不要用defer 去unlock,因为需要自行判断超时时间,然后直接unlock,如果defer再unlock就会触发panic

    作者回复: 赞

    2023-11-27归属地:北京
    1
  • 天空之城
    关于 RWLock 的扩展,我这边给出一段代码(评论不好贴代码,贴个 share link) https://go.dev/play/p/X4YNwqZR4ta ```go package sync import ( "sync" "sync/atomic" "unsafe" ) type RWMutex struct { rw sync.RWMutex } const ( mutexLocked = 1 << iota mutexWake mutexStarving mutexWaiterShift = iota ) const ( rwmutexMaxReaders = 1 << 30 ) func (e *RWMutex) TryLock() bool { if e.GetReader() < 0 { return false } e.rw.Lock() return true } // readerCount > 0 => Reader Hold without Writer, <0 => Reader Hold and Writer waiting, ==0 => no reader // state.mutexLocked == 1 => Writer Hold (in the meanwhile, readerWaiter==0, all reader(readerCount) are waiting) func (e *RWMutex) IsLocked() bool { return e.IsWriterLocked() || e.IsReaderLocked() } func (e *RWMutex) IsWriterLocked() bool { state := atomic.LoadInt32((*int32)(unsafe.Pointer(&e.rw))) return state&mutexLocked == mutexLocked } func (e *RWMutex) IsReaderLocked() bool { return atomic.LoadInt32(e.readerCountPtr()) != 0 } func (e *RWMutex) HasWriter() bool { return atomic.LoadInt32(e.readerCountPtr()) < 0 } func (e *RWMutex) GetWriter() int32 { state := atomic.LoadInt32((*int32)(unsafe.Pointer(&e.rw))) return int32((state >> mutexWaiterShift) + (state & mutexLocked)) } func (e *RWMutex) HasReader() bool { return atomic.LoadInt32(e.readerCountPtr()) != 0 } func (e *RWMutex) GetReader() int32 { r := atomic.LoadInt32(e.readerCountPtr()) if r < 0 { r += rwmutexMaxReaders } return r } func (e *RWMutex) readerCountPtr() *int32 { return (*int32)(unsafe.Pointer(uintptr(unsafe.Pointer(&e.rw)) + unsafe.Sizeof(sync.Mutex{}) + unsafe.Sizeof(uint32(0))*2)) } func (e *RWMutex) readerWaitPtr() *int32 { return (*int32)(unsafe.Pointer(uintptr(unsafe.Pointer(&e.rw)) + unsafe.Sizeof(sync.Mutex{}) + unsafe.Sizeof(uint32(0))*2 + unsafe.Sizeof((int32(0))))) } ```

    作者回复: 👍🏻

    2023-05-12归属地:北京
    1
  • 斯蒂芬.赵
    fast path执行失败,直接返回false不就行了,为啥还要往下执行?正常不是多个携程并发只有一个执行成功,其他都是失败么?

    作者回复: 尝试和唤醒的goroutine抢一抢。 当然为了简单不易出错,return false更好,容易理解,不易出错

    2021-05-07
    3
    1
  • 路过
    想问一下,为什么最后实现线程安全的队列里面的 Dequeue() 方法释放锁不用defer,这样不用写两次unlock

    作者回复: 尽早释放锁

    2023-04-17归属地:广东
  • 鲁迅原名周树人
    老师您好, // 如果能成功抢到锁 if atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), 0, mutexLocked) { return true } 在以上代码中,(*int32)(unsafe.Pointer(&m.Mutex))是取的Mutex中state的首地址对嘛?

    作者回复: 对

    2021-05-03
    2
  • 黄毅
    func (m *Mutex) TryLock() bool { // 如果能成功抢到锁 if atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), 0, mutexLocked) { return true } // 如果处于唤醒、加锁或者饥饿状态,这次请求就不参与竞争了,返回false old := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex))) fmt.Println("===old===:", old) if old&(mutexLocked|mutexStarving|mutexWoken) != 0 { return false } // 尝试在竞争的状态下请求锁 new := old | mutexLocked fmt.Println("===new===:", new) //请问在什么情况下会执行到这里 return atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), old, new) } func main() { var mu Mutex go func() { // 启动一个goroutine持有一段时间的锁 mu.Lock() time.Sleep(time.Duration(rand.Intn(2)) * time.Second) mu.Unlock() }() time.Sleep(time.Second) n := int(10) var wg sync.WaitGroup wg.Add(n) for i := 0; i < n; i++ { go func() { ok := mu.TryLock() // 尝试获取到锁 defer wg.Done() if ok { // 获取成功 fmt.Println("got the lock") // do something mu.Unlock() return } }() } // 没有获取到 wg.Wait() } 老师,你好。在main中尝试编写一段逻辑测试TryLock方法,请问在什么情况下会执行fmt.Println("===new===:", new) 请老师答疑解惑,谢谢。

    作者回复: 在大并发竞争锁,在释放锁的时候可能会出现。很极端的情况,可能难以模拟,但是通过分析mutex的lock/unlock代码应该能分析出来。 另外如果trylick简单实现,可以只保留第一段,其它情况返回false即可

    2020-11-16
收起评论
显示
设置
留言
36
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部