golang-sync.RWMutex

使用场景

sync.Mutex提供了互斥锁,可以保证在同一时间段内,有且仅有一个goroutine持有锁和操作共享资源。其余goroutine只有在互斥锁被释放,成功获取到锁之后,才能操作共享资源

对共享资源的操作其实可以分为两种:

  • 读操作,不会改变共享资源
  • 写操作,会改变共享资源

在实际业务中,往往是读操作次数大于写操作次数,sync.Mutex提供的互斥锁,不能支持并发的读操作,所以就有了sync.RWMutex

sync.RWMutex有以下特点:

  • 在同一时间段,可以有多个goroutine获取到读锁,即读共享
  • 在同一时间段,只能有一个goroutine获取到写锁,即写互斥
  • 在同一时间段,只能存在读锁或写锁,即读写互斥

如何使用

RWMutex结构如下:

type RWMutex struct {
   w           Mutex  // held if there are pending writers
   writerSem   uint32 // semaphore for writers to wait for completing readers
   readerSem   uint32 // semaphore for readers to wait for completing writers
   readerCount int32  // number of pending readers
   readerWait  int32  // number of departing readers
}

// RWMutex提供了以下几个方法
// 加读锁
func (rw *RWMutex) RLock() {}
// 解读锁
func (rw *RWMutex) RUnlock() {}
// 尝试加读锁
func (rw *RWMutex) TryRLock() bool {}
// 加写锁
func (rw *RWMutex) Lock() {}
// 解写锁
func (rw *RWMutex) Unlock() {}
// 尝试加写锁
func (rw *RWMutex) TryLock() bool {}
// 返回一个Locker接口
func (rw *RWMutex) RLocker() Locker {}

使用RWMutex进行读写锁演示代码:

func TestRWMutexLock(t *testing.T) {
   var rw sync.RWMutex
   var wg sync.WaitGroup
   for i := 0; i < 5; i++ {
      go func() {
         wg.Add(1)
         defer wg.Done()
         // 读锁
         rw.RLock()
         defer rw.RUnlock()
         time.Sleep(1 * time.Second)
         fmt.Println("读操作")
      }()
   }

   for i := 0; i < 5; i++ {
      go func() {
         wg.Add(1)
         defer wg.Done()
         // 写锁
         rw.Lock()
         defer rw.Unlock()
         time.Sleep(1 * time.Second)
         fmt.Println("写操作")
      }()
   }
   wg.Wait()
}

底层原理

字段含义

const rwmutexMaxReaders = 1 << 30

type RWMutex struct {
   w           Mutex  // held if there are pending writers
   writerSem   uint32 // semaphore for writers to wait for completing readers
   readerSem   uint32 // semaphore for readers to wait for completing writers
   readerCount int32  // number of pending readers
   readerWait  int32  // number of departing readers
}
  • rwmutexMaxReaders:表示RWMutex能接受的最大读操作数量,超过最大数量就会panic
  • w:互斥锁,用于实现互斥写操作
  • writerSem:写操作信号量,用于写操作的阻塞和唤醒。当存在正在执行的读操作时,写操作会被阻塞;当读操作全部完成后,通过writerSem写操作信号量来唤醒写操作
  • readerSem:读操作信号量,用于读操作的阻塞和唤醒。当存在正在执行的写操作时,读操作会被阻塞;当写操作完成后,通过readerSem读操作信号量唤醒读操作
  • readerCount:正在执行的读操作数量,当不存在写操作时,从0开始计数,通过正数来表示;当存在写操作时,从负的rwmutexMaxReaders开始计数,通过负数来表示
  • readerWait:写操作等待读操作的数量,当执行Lock方法时,如果当前存在正在执行的读操作,会将正在执行的读操作数量记录在readerWait中,并阻塞写操作;当读操作执行完成后,会更新readerWait;当readerWait为0时,会唤醒写操作
  • RWMutex具有写操作优先的特点,写操作发生时,只允许正在执行的读操作继续执行完成,后续新来的读操作都会被阻塞,直到写操作完成后进行唤醒

Lock

func (rw *RWMutex) Lock() {
   if race.Enabled {
      _ = rw.w.state
      race.Disable()
   }
   // 加锁,保证写操作互斥
   rw.w.Lock()
   // 将readerCount更新为负值,表示当前有写操作
   // 当readerCount为负数时,新的读操作会被阻塞
   // r表示当前正在执行的读操作数量
   r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
   // r != 0 表示当前存在正在执行的读操作
   // 把当前正在执行的读操作数量更新到readerWait中
   if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
      // 阻塞写操作,等待读操作执行完后唤醒
      runtime_SemacquireMutex(&rw.writerSem, false, 0)
   }
   if race.Enabled {
      race.Enable()
      race.Acquire(unsafe.Pointer(&rw.readerSem))
      race.Acquire(unsafe.Pointer(&rw.writerSem))
   }
}

先通过Mutex进行加锁,保证写操作互斥

将readerCount更新为负值,表示当前有写操作。当readerCount为负数时,新的读操作会被阻塞

若当前存在正在执行的读操作,把当前正在执行的读操作数量更新到readerWait中

阻塞当前写操作,等待读操作执行完后唤醒

Unlock

func (rw *RWMutex) Unlock() {
   if race.Enabled {
      _ = rw.w.state
      race.Release(unsafe.Pointer(&rw.readerSem))
      race.Disable()
   }
   // 将readerCount更新为正数,表示当前没有写操作
   r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
   if r >= rwmutexMaxReaders {
      race.Enable()
      throw("sync: Unlock of unlocked RWMutex")
   }
   // 唤醒所有等待的读操作
   for i := 0; i < int(r); i++ {
      runtime_Semrelease(&rw.readerSem, false, 0)
   }
   // 释放锁
   rw.w.Unlock()
   if race.Enabled {
      race.Enable()
   }
}

将readerCount更新为正数,表示当前没有写操作

若存在等待的读操作,则唤醒所有等待的读操作

释放互斥锁

RLock

func (rw *RWMutex) RLock() {
   if race.Enabled {
      _ = rw.w.state
      race.Disable()
   }
   // 原子更新readerCount+1,表示读操作数量+1
   // 若readerCount+1为负数,表示当前存在写操作,读操作会被阻塞,等待写操作完成后被唤醒
   if atomic.AddInt32(&rw.readerCount, 1) < 0 {
      runtime_SemacquireMutex(&rw.readerSem, false, 0)
   }
   if race.Enabled {
      race.Enable()
      race.Acquire(unsafe.Pointer(&rw.readerSem))
   }
}

原子更新readerCount+1,读操作数量+1

如果readerCount+1为负数,则表示当前存在写操作,此时需要加锁的读操作会被阻塞,等待写操作完成后被唤醒

RUnlock

func (rw *RWMutex) RUnlock() {
   if race.Enabled {
      _ = rw.w.state
      race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
      race.Disable()
   }
   // 原子更新readerCount-1,表示读操作数量-1
   // 若readerCount-1为负数,表示当前读操作阻塞了写操作,需要进行额外处理
   if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
      // Outlined slow-path to allow the fast-path to be inlined
      rw.rUnlockSlow(r)
   }
   if race.Enabled {
      race.Enable()
   }
}

func (rw *RWMutex) rUnlockSlow(r int32) {
   if r+1 == 0 || r+1 == -rwmutexMaxReaders {
      race.Enable()
      throw("sync: RUnlock of unlocked RWMutex")
   }
   // 原子更新readerWait-1,表示阻塞写操作的读操作数量-1
   // 当readerWait-1为0时,表示导致写操作阻塞的所有读操作都已经执行完成,此时需要把阻塞的写操作唤醒
   if atomic.AddInt32(&rw.readerWait, -1) == 0 {
      runtime_Semrelease(&rw.writerSem, false, 1)
   }
}

原子更新readerCount-1,表示读操作数量-1

若readerCount-1为负数,表示当前读操作阻塞了写操作,需要进行额外处理

原子更新readerWait-1,表示阻塞写操作的读操作数量-1

当readerWait-1为0时,表示导致写操作阻塞的所有读操作都已经执行完成,此时需要把阻塞的写操作唤醒

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章