教程集 www.jiaochengji.com
教程集 >  Golang编程  >  golang教程  >  正文 golang mutex源码详细解析

golang mutex源码详细解析

发布时间:2022-01-04   编辑:jiaochengji.com
教程集为您提供golang mutex源码详细解析等资源,欢迎您收藏本站,我们将为您提供最新的golang mutex源码详细解析资源
<svg xmlns="http://www.w3.org/2000/svg" style="display: none;"><path stroke-linecap="round" d="M5,0 0,2.5 5,5z" id="raphael-marker-block" style="-webkit-tap-highlight-color: rgba(0, 0, 0, 0);"/></svg>

目前golang的版本是1.12,其中的mutex是增加了普通模式和饥饿模式切换的优化版本,为了便于理解,这里先从上一个版本1.7版本的mutex开始分析,以后再对优化版本进行说明。

<h3>Mutex结构说明</h3> <h4>定义</h4>

最初版本锁的定义如下:

<pre><code>// mutex是互斥锁 // mutex的零值是没有加锁的 //在使用之后不能被拷贝 type Mutex struct { state int32 //状态标识 sema uint32 //信号量 } const ( mutexLocked = 1 << iota // mutex is locked mutexWoken mutexWaiterShift = iota ) </code></pre>

其中state是记录用来记录加锁状态的,将一个整型按位划分来表示不同的含义,从低到高分别为第1位到第32位,

<ul><li>第1位表示是否被锁住,即0表示没有锁住,mutexLocked也就是1表示已经被锁住。</li><li>第2表示是否被唤醒,1表示被唤醒,mutexWoken=2表示被唤醒。</li><li>第3位到第32位表示等待在mutex上协程数量,mutexWaiterShift=3表示在获取等待协程数量,需要将state右移位3位。</li></ul>

其中sema是信号量,是一个非负数的全局变量,下面对信号量进行简单说明。

<h4>
信号量</h4>

信号量是进程间通信处理同步互斥的机制,通过一个计数器来控制对共享资源的访问次数限制。例如一个办公室有两台打印机,有几十台电脑连上,这是同时只能允许两个电脑进行打印,而其他电脑必须排队等待完成后才能打印。

sema就是信号量,是一个非负数的全局变量,该变量有两个操作P和V,PV操作都是不可中断的。

P(S):
(1)执行S=S-1;
(2)进行以下判断:

<ul><li>如果S < 0,进入阻塞队列,直到满足S>=0</li><li>如果S >= 0, 直接返回
因此P操作执行一次意味着分配一个资源,如上打印机意味着是资源,当S小于0意味着没有可用资源了,只能一直等待,直到资源空闲出来时才能继续。</li></ul>

V(S):
(1)执行S=S 1;
(2)进行以下判断:

<ul><li>如果S > 0,直接返回</li><li>如果S <= 0, 释放阻塞队列中的第一个等待进程
因此V操作执行一次意味着释放一个资源,当S小于等于0时,意味着还有进程在请求资源,此时释放了一个资源,就需要从等待队列中拿出一个进程来使用此刻释放的资源。</li></ul><h4>
golang中信号量操作</h4>

runtime_Semacquire
func runtime_Semacquire(s *uint32),P操作,等待*s大于等于0,源码在runtime/sema.go中

runtime_Semrelease
func runtime_Semrelease, V操作,阻塞等待被唤醒,目前版本在runtime/sema.go中(定义稍有不同了)。

<h4>
为何用信号量实现互斥锁</h4>

这里借用了下面博客部分内容进行说明:https://www.cnblogs.com/niniwzw/p/3153955.html

考虑下,如果直接用信号量来实现互斥,即新建一个sema=1,然后用PV操作runtime_Semacquire和runtime_Semrelease来实现,也可以做到当一次请求时,拿到资源进行执行,后续请求阻塞,进入等待队列,不考虑性能,按照这样简单的思路实现如下:

<pre><code>type Mutex struct { sema uint32 } func NewMutex() *Mutex { var mu Mutex mu.sema = 1 return &mu } func (m *Mutex) Lock() { runtime_Semacquire(&m.sema) } func (m *Mutex2) Unlock() { runtime_Semrelease(&m.sema) } </code></pre>

这里会有一点问题是,当加锁一次,代码中解锁了两次, 会导致sema值变化而不提示任何错误,即这时sema=2,资源数量发生了变化,导致后续运行异常,所以多次解锁时需要返回异常。这里,通过多一个变量来表示加锁次数,改进代码如下:

<pre><code>type Mutex struct { key int32 sema uint32 } func (m *Mutex) Lock() { if atomic.AddInt32(&m.key, 1) == 1 { // changed from 0 to 1; we hold lock return } runtime_Semacquire(&m.sema) } func (m *Mutex) Unlock() { switch v := atomic.AddInt32(&m.key, -1); { case v == 0: // changed from 1 to 0; no contention return case v == -1: // changed from 0 to -1: wasn't locked // (or there are 4 billion goroutines waiting) panic("sync: unlock of unlocked mutex") } runtime_Semrelease(&m.sema) } </code></pre>

这个解决方案除了解决了我们前面说的重复加锁的问题外,还对我们初始化工作做了简化,不需要构造函数了。执行过程中值变化如下:

<ul><li>初始:key=0, sema = 0</li><li>Lock第一次:key 1=1返回,sema=0,即第一次不进行P操作,直接将key加1表示获取了锁。</li><li>Lock第二次:key=2,进行P操作,发现sema-1 =-1<0,阻塞等待获取锁。</li></ul>

当执行了一次Lock后,key=1,sema=0,执行以下操作时:

<ul><li>Unlock第一次:key-1=0返回,sema=0,第一次解锁不执行V操作,直接key减1表示释放锁。</li><li>Unlock第二次:key-1=-1,表示解锁过了,返回异常。</li></ul>

当执行了两次Lock后,key=2,sema=-1,执行以下操作时:

<ul><li>Unlock第一次:key-1=1,执行V操作runtime_Semrelease,发现sema 1=0,会阻塞直到唤醒了其他协程,然后返回。</li></ul>

简单来说,增加一个key变量后,sema=0表示有一个资源,跟只用信号量时sema=1含义一样,在golang mutex也是基于此实现的。

<h3>
Mutex操作解读</h3> <h4>Lock</h4>

最初版本的mutex lock如下:

<pre><code>func (m *Mutex) Lock() { -----------------代码块1 start----------------- // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } -----------------代码块1 end----------------- awoke := false iter := 0 for { -----------------代码块2 start----------------- old := m.state new := old | mutexLocked if old&mutexLocked != 0 { if runtime_canSpin(iter) { // Active spinning makes sense. // Try to set mutexWoken flag to inform Unlock // to not wake other blocked goroutines. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter continue } new = old 1<<mutexWaiterShift } -----------------代码块2 end----------------- -----------------代码块3 start----------------- if awoke { // The goroutine has been woken from sleep, // so we need to reset the flag in either case. if new&mutexWoken == 0 { panic("sync: inconsistent mutex state") } new &^= mutexWoken } -----------------代码块3 end----------------- -----------------代码块4 start----------------- if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&mutexLocked == 0 { break } runtime_Semacquire(&m.sema) awoke = true iter = 0 } -----------------代码块4 start----------------- } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } } </code></pre>

将上面代码标注为4块,下面依次进行分析。对代码逻辑进行详细分析之前,先介绍下其中用到部分函数。

race.Acquire

<pre><code> if race.Enabled { race.Acquire(unsafe.Pointer(m)) } </code></pre>

竞争检测逻辑,。go中使用goroutine比较常见,在大型项目中可能会在多个goroutine中用到某个全局变量,如果有竞争就需要加锁操作。go提供了race检测工具,可以使用go run -race 或者 go build -race来进行竞争检测。详细可参考:https://yq.aliyun.com/articles/336467

runtime_canSpin
判断是否需要自选,golang中自旋锁并不会一直自旋下去,在runtime包中runtime_canSpin方法做了一些限制, 传递过来的iter大等于4或者cpu核数小等于1,最大逻辑处理器大于1,至少有个本地的P队列,并且本地的P队列可运行G队列为空才会进行自旋。

<pre><code>//go:linkname sync_runtime_canSpin sync.runtime_canSpin func sync_runtime_canSpin(i int) bool { if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle sched.nmspinning) 1 { return false } if p := getg().m.p.ptr(); !runqempty(p) { return false } return true } </code></pre>

runtime_doSpin
进行自旋操作,会调用procyield函数,该函数也是汇编语言实现。函数内部循环调用PAUSE指令。PAUSE指令什么都不做,但是会消耗CPU时间,在执行PAUSE指令时,CPU不会对它做不必要的优化。

<pre><code>//go:linkname sync_runtime_doSpin sync.runtime_doSpin func sync_runtime_doSpin() { procyield(active_spin_cnt) } </code></pre> <h5>
代码块1</h5> <pre><code> if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } </code></pre>

如果state=0,即没有锁住、没有唤醒且没有等待队列,可直接拿到锁,将状态置为锁住并返回,这相当于是上面demo版中从key=0,sema=0的状态,变为key=1,seme=0的状态。

<h5>
代码块2</h5> <pre><code> //最新状态 old := m.state new := old | mutexLocked //已经被锁住 if old&mutexLocked != 0 { //判断是否需要自选,这是在for循环中,iter次数可能已经超过不需要自旋了,或者其他条件 if runtime_canSpin(iter) { // 主动自旋是有意义的,因为会尝试唤醒锁, //这样上个协程此时unlock的话,就不会唤醒其他协程 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { //自己没有唤醒,且原状态没有唤醒, //且有协程在排队且设置唤醒标识成功, //说明上个的协程此时unlock了, awoke = true } //自旋一段时间 runtime_doSpin() iter continue } //不需要自旋,将state的等待队列数据加1 new = old 1<<mutexWaiterShift } </code></pre> <h5>代码块3</h5> <pre><code> if awoke { //代码块1中将awoke置为1了,标识被唤醒 //代码块1中只有设置了唤醒标识,awoke才会为true,因此不会new&mutexWoken == 0 if new&mutexWoken == 0 { panic("sync: inconsistent mutex state") } //既然当前协程被唤醒了,需要将state置为未唤醒 new &^= mutexWoken } </code></pre> <h5>代码块4</h5> <pre><code> //这里new有四种值 if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&mutexLocked == 0 { //没有锁住,直接返回 break } //当前锁住的,阻塞在此处等待,会让出cpu runtime_Semacquire(&m.sema) //从阻塞中返回,设置当前协程被唤醒了 awoke = true iter = 0 } </code></pre>

这里new可能有四种值:

<ol><li>new := old | mutexLocked</li><li>new = old 1<<mutexWaiterShift</li><li>new := old | mutexLocked , new &^= mutexWoken</li><li>new = old 1<<mutexWaiterShift, new &^= mutexWoken</li></ol>

情况1
new := old | mutexLocked ,协程在开始自旋前或者自旋过程中,原协程已经unlock了,会出线这种情况。假设原协程为a,当前协程为b,执行如下:
a.Lock()
b.Lock()
此时b中lock可能逻辑为:

<pre><code>//state已经被锁住 -----------------代码块1 start----------------- ... -----------------代码块1 end----------------- //这里a执行 a.Unlock()将state设置为未锁住状态 awoke := false for { old := m.state new := old | mutexLocked if old&mutexLocked != 0 { //不执行此块 } if awoke { //不执行此块 } if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&mutexLocked == 0 { //执行这里 break } //不执行以下几句 runtime_Semacquire(&m.sema) awoke = true iter = 0 } } </code></pre>

或者自旋未结束前a.Unlock(),这时new = old 1<<mutexWaiterShift不会被执行,

<pre><code>//state已经被锁住 -----------------代码块1 start----------------- ... -----------------代码块1 end----------------- awoke := false for { old := m.state new := old | mutexLocked if old&mutexLocked != 0 { if runtime_canSpin(iter) { //执行此块,iter还小于active_spin次,回到for开始 //在iter小于active_spin之前,a执行了unlock,此时b会执行到代码块4 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { //先假设这里未执行,如果执行了,则是情况3的赋值逻辑了 awoke = true } continue } //不执行这句 new = old 1<<mutexWaiterShift } if awoke { //不执行此块 } if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&mutexLocked == 0 { //执行这里 break } //不执行以下几句 runtime_Semacquire(&m.sema) awoke = true iter = 0 } } </code></pre>

情况2
new = old 1<<mutexWaiterShift,这是执行完自旋流程,或者不需要执行自旋的情况,即代码运行到代码块2的new = old 1<<mutexWaiterShift,

<pre><code>//state已经被锁住 -----------------代码块1 start----------------- ... -----------------代码块1 end----------------- awoke := false for { old := m.state new := old | mutexLocked if old&mutexLocked != 0 { if runtime_canSpin(iter) { //执行此块,iter还小于active_spin次,回到for开始 //在iter小于active_spin之前,a执行了unlock,此时b会执行到代码块4 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { //先假设这里未执行,如果执行了,则是情况4的赋值逻辑了 awoke = true } continue } //执行这句 new = old 1<<mutexWaiterShift } if awoke { //不执行此块 } if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&mutexLocked == 0 { //不执行这里 break } //执行这里阻塞 runtime_Semacquire(&m.sema) awoke = true iter = 0 } } </code></pre>

情况3和情况4
这两种是情况1和2中,执行到下面语句的情况:

<pre><code>if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } </code></pre>

此时因为awoke=true,也会执行:

<pre><code>if awoke { if new&mutexWoken == 0 { panic("sync: inconsistent mutex state") } new &^= mutexWoken } </code></pre>

这种情况是在自旋过程中,设置唤醒标识成功,即本协程可以拿到锁,因此需要将唤醒标识置为0,防止其他协程获取。

<h4>
Unlock</h4>

源码如下:

<pre><code>func (m *Mutex) Unlock() { //race检测 if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } // 判断是否多次解锁,多次解锁则抛出异常 new := atomic.AddInt32(&m.state, -mutexLocked) if (new mutexLocked)&mutexLocked == 0 { panic("sync: unlock of unlocked mutex") } old := new for { if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { return } new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema) return } old = m.state } } </code></pre>

for循环中,因为old在更新,第一个if语句会在以下两种情况时返回:

<ul><li>当前锁上没有协程在等待</li><li>当前锁已经被其他协程lock了或者唤醒了</li></ul>

第二个if语句,cas原子操作将等待协程数目减1,并设置唤醒标识,阻塞在runtime_Semrelease处,直到有其他协程被唤醒才返回。
看到这里,就可以知道,唤醒操作有两种:
(1)lock函数,执行自旋过程中主动唤醒自己,会执行到awoke = true相关代码;
(2)unlock函数,原协程设置唤醒标识,本协程被动唤醒,不会执行awoke = true相关代码。

<h4>
从执行来看状态变化</h4>

假如依次执行:

<pre><code>Mutex mutex mutex.Lock() // a协程 mutex.Lock() // b协程 mutex.Lock() // c协程 mutex.Unlock() // a协程 mutex.Unlock() // b协程 mutex.Unlock() // c协程 </code></pre>

定义lockflag表示加锁位,wokenflag表示唤醒位,waitcount表示等待队列个数。

<ul><li>原始:sema = 0, lockflag = 0, waitcount = 0,wokenflag = 0</li><li>a中lock后:sema = 0, lockflag = 1, waitcount = 0,wokenflag = 0</li><li>b中lock后:sema = -1, lockflag = 1, waitcount = 1,wokenflag = 0</li><li>c中lock后:sema = -2, lockflag = 1, waitcount = 2,wokenflag中间设置为1,后面修改为0</li><li>a中unlock后:sema = -1, lockflag = 1, waitcount = 1,wokenflag 中间设置为1,后面被其他协程(这里为b)修改为0。</li><li>b中unlock后:sema = 0, lockflag = 1, waitcount = 0,wokenflag 中间设置为1,后面被其他协程(这里为a)修改为0。</li><li>a中unlock后:sema = 0, lockflag = 0, waitcount = 0,wokenflag = 0。
这里考虑的是比较简单的情况,不过对于理解代码逻辑已足够。</li></ul><h3>
总结</h3>

这里对golang的上一个版本的mutex进行了解读,借鉴了一些博客的部分内容,加入了很多自己的理解过程,希望能够容易理解一点。后续会对新版本的mutex也做下解读。

<h3>
参考</h3>

https://www.cnblogs.com/niniwzw/p/3153955.html
http://www.legendtkl.com/2016/10/23/golang-mutex/

到此这篇关于“golang mutex源码详细解析”的文章就介绍到这了,更多文章或继续浏览下面的相关文章,希望大家以后多多支持JQ教程网!

您可能感兴趣的文章:
golang goroutine实现_golang中的Mutex设计原理详解(一)
golang中的Mutex设计原理详解(一)
golang mutex源码详细解析
Golang Once源码解析
并发同时访问_快速掌握Golang锁机制和并发基础
GO 互斥锁实现原理剖析
想系统学习GO语言(Golang
Goroutine 的同步(第三部分)
golang会取代php吗
golang url 收集

[关闭]
~ ~