手牵手教你阅读golang之sync.Cond

一、什么是sync.Cond

要理解这个玩意,首先得知道它是什么,其实说白了这玩意(sync.Cond)就是实现了一个条件变量,它是一个或者一组goroutine等待被唤醒的一个条件判断点,每个Cond都有一个关联的Locker L(通常是* Mutex或* RWMutex),与互斥锁和读写锁不同的是,简单的声明无法创建出来一个可用的条件变量,为了得到一个这样的条件变量我们需要使用到NewCond方法。有些抽象吧,不急我们一起阅读下段代码:

package main

import (
	"fmt"
	"sync"
	"time"
)

var locker = new(sync.Mutex)
var cond = sync.NewCond(locker)

func main() {
	for i := 0; i < 10; i++ {
		go func(x int) {
			cond.L.Lock()         //获取锁
			defer cond.L.Unlock() //释放锁
			cond.Wait()           //等待通知,阻塞当前goroutine
			fmt.Println(x)
			time.Sleep(time.Second * 1)
		}(i)
	}
	time.Sleep(time.Second * 1)
	fmt.Println("Signal...")
	cond.Signal() // 下发一个通知给已经获取锁的goroutine
	time.Sleep(time.Second * 1)
	cond.Signal() // 3秒之后 下发一个通知给已经获取锁的goroutine
	time.Sleep(time.Second * 3)
	cond.Broadcast() //3秒之后 下发广播给所有等待的goroutine
	fmt.Println("Broadcast...")
	time.Sleep(time.Second * 60)
}

// 结果输出
Signal...
0
3
Broadcast...
1
4
9
2
6
5
7
8

可以得到结论:

(1)goroutine调用wait()方法进行阻塞,调用signal()唤起单个协程,调用broadcast唤起全部协程;

(2)在调用wait()函数之前需要加锁,也需要在最后手动释放锁;

接下来看看sync.Cond内部结构体实现。


二、sync.Cond结构体说明

sync.Cond

type Cond struct {
	noCopy noCopy  // Cond使用后不允许拷贝
	L Locker
	notify  notifyList   //通知列表调用wait()方法的goroutine会被放到notifyList中
	checker copyChecker  //检查Cond实例是否被复制
}

noCopy

type noCopy struct{}
func (*noCopy) Lock()   {}
func (*noCopy) Unlock() {}

noCopy可以嵌入到结构中,在第一次使用后不可复制,使用go vet作为检测使用。

L

type Locker interface {
	Lock()
	Unlock()
}

L是匿名接口类型,用来继承实现了Locker里面方法的类型,也可以重写里面的方法。在sync.Cond中可以传入一个读写锁或互斥锁,当修改条件或者调用wait 方法时需要加锁。

notify

type notifyList struct {
	wait uint32
	notify uint32
	lock mutex
	head *sudog
	tail *sudog
}

notify:对应notifyList类型,其数据结构映射到 src/runtime/sema.go 中的notifyList。wait:下一个等待goroutine的ticket,是原子的,在锁之外递增。notify:下一个通知的goroutine的ticket,在锁之外读取,但只能在持有锁的情况下写入。lock:需要传入的锁标记。head:基于 *sudog的双向链表的前驱指针。tail:基于 *sudog的双向链表的后继指针。

包含了3类字段:

  • wait和notify两个无符号整型,分别表示了Wait()操作的次数和goroutine被唤醒的次数,wait应该是恒大于等于notify。
  • lock mutex 这个跟sync.Mutex我们分析信号量阻塞队列时semaRoot里的mutex一样,并不是Go提供开发者使用的sync.Mutex,而是系统内部运行时实现的一个简单版本的互斥锁。
  • head和tail看名字,我们就能脑补出跟链表很像 没错这里就是维护了阻塞在当前sync.Cond上的goroutine构成的链表

整体来讲sync.Cond大体结构为:

手牵手教你阅读golang之sync.Cond

sync.Cond结构

checker

type copyChecker uintptr 

func (c *copyChecker) check() {
    if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
        !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
        uintptr(*c) != uintptr(unsafe.Pointer(c)) {
        panic("sync.Cond is copied")
    }
}

copyChecker保留指向自身的指针以检测对象的复制。

大致意思是说,初始type copyChecker uintptr默认为0,当第一次调用check()会将

copyChecker自身的地址复制给自己,至于为什么uintptr(*c) != uintptr(unsafe.Pointer(c))会被调用2次,期间goroutine可能已经改变copyChecker。二次调用如果不相等,则说明sync.Cond被复制,重新分配了内存地址。

三、四种notify处理函数

runtime_notifyListAdd

在src/runtime/sema.go中被实现 notifyListAdd ;表示将调用者添加到 notify 列表中,以便它可以接收通知。

//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
    // This may be called concurrently, for example, when called from
    // sync.Cond.Wait while holding a RWMutex in read mode.
    return atomic.Xadd(&l.wait, 1) - 1
}

此处原子性操作l.wait的值,返回值代表当前协程的ticket编号,唤醒时此值也需要用到。

runtime_notifyListWait

在src/runtime/sema.go中被实现 notifyListWait ;表示将当前goroutine休眠,等待通知。接到通知以后才会被唤醒。

// notifyListWait waits for a notification. If one has been sent since
// notifyListAdd was called, it returns immediately. Otherwise, it blocks.
//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
    lockWithRank(&l.lock, lockRankNotifyList)

    // 如果已经被唤醒 则立即返回
    if less(t, l.notify) {
        unlock(&l.lock)
        return
    }

    // Enqueue itself.
    s := acquireSudog()
    s.g = getg()
  // 把等待递增序号赋值给s.ticket 为FIFO打基础
    s.ticket = t
    s.releasetime = 0
    t0 := int64(0)
    if blockprofilerate > 0 {
        t0 = cputicks()
        s.releasetime = -1
    }
  // 将当前goroutine插入到notifyList链表中
    if l.tail == nil {
        l.head = s
    } else {
        l.tail.next = s
    }
    l.tail = s
  // 最终调用gopark挂起当前goroutine
    goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
    if t0 != 0 {
        blockevent(s.releasetime-t0, 2)
    }
  // goroutine被唤醒后释放sudog
    releaseSudog(s)
}

此函数主要做两件事:

(1)获取Sudog结构体并初始化,把该结构体插入notifyList链表中;

(2)调用gopark将当前goroutine挂起;

runtime_notifyListNotifyOne

在src/runtime/sema.go中被实现 notifyListNotifyOne ;表示发送通知,唤醒 notify 列表中的一个协程。

//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
  // wait==notify 说明没有等待的goroutine了
    if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
        return
    }
    lockWithRank(&l.lock, lockRankNotifyList)
    // 锁下二次检查
    t := l.notify
    if t == atomic.Load(&l.wait) {
        unlock(&l.lock)
        return
    }

    // 更新下一个需要被唤醒的ticket number
    atomic.Store(&l.notify, t+1)

    // Try to find the g that needs to be notified.
    // If it hasn't made it to the list yet we won't find it,
    // but it won't park itself once it sees the new notify number.
    //
    // This scan looks linear but essentially always stops quickly.
    // Because g's queue separately from taking numbers,
    // there may be minor reorderings in the list, but we
    // expect the g we're looking for to be near the front.
    // The g has others in front of it on the list only to the
    // extent that it lost the race, so the iteration will not
    // be too long. This applies even when the g is missing:
    // it hasn't yet gotten to sleep and has lost the race to
    // the (few) other g's that we find on the list.
  //这里是FIFO实现的核心 其实就是遍历链表 sudog.ticket查找指定需要唤醒的节点
    for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
        if s.ticket == t {
            n := s.next
            if p != nil {
                p.next = n
            } else {
                l.head = n
            }
            if n == nil {
                l.tail = p
            }
            unlock(&l.lock)
            s.next = nil
            readyWithTime(s, 4)
            return
        }
    }
    unlock(&l.lock)
}

主要逻辑:

  1. 判断是否存在等待需要被唤醒的goroutine 没有直接返回
  2. 递增notify属性,因为是根据notify和sudog.ticket匹配来查找需要唤醒的goroutine,因为其是递增生成的,故而有了FIFO语义。
  3. 遍历notifyList持有的链表,从head开始依据next指针依次遍历。这个过程是线性的,故而时间复杂度为O(n),不过官方说法这个过程实际比较快This scan looks linear but essentially always stops quickly.

有个小细节:还记得我们Wait()操作中,wait属性原子更新和goroutine插入等待链表是两个单独的步骤,所以存在竞争的情况下,链表中的节点可能会轻微的乱序产生。但是不要担心,因为ticket是原子递增的 所以唤醒顺序不会乱。

runtime_notifyListNotifyAll

在src/runtime/sema.go中被实现 notifyListNotifyAll ;表示发送通知,唤醒 notify 列表中的所有协程。

func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}
// notifyListNotifyAll notifies all entries in the list.
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
    // Fast-path 无等待goroutine直接返回
    if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
        return
    }

    lockWithRank(&l.lock, lockRankNotifyList)
    s := l.head
    l.head = nil
    l.tail = nil
    // 直接更新notify=wait
    atomic.Store(&l.notify, atomic.Load(&l.wait))
    unlock(&l.lock)

    // 依次调用goready唤醒goroutine
    for s != nil {
        next := s.next
        s.next = nil
        readyWithTime(s, 4)
        s = next
    }
}

Broadcast()与Singal()区别主要是它可以唤醒全部等待的goroutine,并直接将wait属性的值赋值给notify。

四、源码实现

NewCond

// 初始化条件变量,l可以是读写锁或者是互斥锁
func NewCond(l Locker) *Cond {
	return &Cond{L: l}
}

Wait

// 阻塞当前goroutine,并等待条件触发,必须获得锁之后才能调用wait方法
func (c *Cond) Wait() {
        // 检查c是否被复制,如果是则panic
	c.checker.check()
        // 将当前goroutine加入等待队列
	t := runtime_notifyListAdd(&c.notify)
        // 注意这里,必须先解锁,因为 runtime_notifyListWait 要切走 goroutine
	// 所以这里要解锁,要不然其他 goroutine 没法获取到锁了
	c.L.Unlock() 
        // 等待通知队列中的所有的goroutine执行等待唤醒操作
	runtime_notifyListWait(&c.notify, t)
        // 等待唤醒,因此需要再度锁上
	c.L.Lock() 
}
func (c *copyChecker) check() {
	if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
		!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
		uintptr(*c) != uintptr(unsafe.Pointer(c)) {
		panic("sync.Cond is copied")
	}
}

Signal

// 唤起一个等待的goroutine
func (c *Cond) Signal() {
	c.checker.check()//检测c是否被复制,如果是则panic
        // 通知等待列表中的一个
	runtime_notifyListNotifyOne(&c.notify)
}

Broadcast

// 唤醒等待通知队列中的goroutine
func (c *Cond) Broadcast() {
	c.checker.check() //检测c是否被复制,如果是则panic
        // 唤醒等待通知队列中的goroutine
	runtime_notifyListNotifyAll(&c.notify)
}

总结

  1. sync.Cond一旦创建使用 不允许被拷贝,由noCopy和copyChecker来限制保护。
  2. Wait()操作先是递增notifyList.wait属性 然后将goroutine封装进sudog,将notifyList.wait赋值给sudog.ticket,然后将sudog插入notifyList链表中
  3. Singal()实际是按照notifyList.notify跟notifyList链表中节点的ticket匹配 来确定唤醒的goroutine,因为notifyList.notify和notifyList.wait都是原子递增的,故而有了FIFO的语义
  4. Broadcast()相对简单 就是唤醒全部等待的goroutine


如果看完有什么疑问或建议,请在评论区或私信和我交流吧!

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章