目录

并发编程进阶篇

1. 锁简介

Mutex 可以看做是锁,而 RWMutex 则是读写锁。 一般的用法是将 Mutex 或者 RWMutex 和需要被保住的资源封装在一个结构体内。

  • 如果有多个 goroutine 同时读写的资源,就一定要保护起来。
  • 如果多个 goroutine 只读某个资源,那就不需要保护。

1.1 锁分类

使用锁的时候,优先使用 RWMutex。

  • RWMutex:核心就是四个方法,RLock(加读锁)、 RUnlock(释放读锁)、Lock(加写锁)、Unlock(释放写锁)
  • Mutex:Lock 和 Unlock(互斥锁)

互斥锁:读都不能一起读,读的时候不能写,写的时候不能读。这是最普通的互斥锁。

只读锁:如果加的是读锁,是不是同一个时间是不能两个goroutine同时读呢,答案是可以的,因为读锁是共享锁。但是读的时候是不能写的,所以叫只读锁,但是其他goroutine却是可以去读的。

写锁: 写锁是独占锁,写时不能读,读写锁是写优先的锁,基本就是写要占锁时,就算没有占成功,所有的新来的读锁全部都得靠边站,等待旧读锁释放,写锁锁定。

1.2 三种写法比较优劣

写法1~全局变量

写法一: 看下面代码声明,下面代码中,声明了两个公共的包级变量

1
2
3
4
package syncdemo
import "sync"
var PublicResource map[string]string
var Public sync.RWMutex

在另外一个包中,如下代码,直接使用了这个map类型的包及变量,这个是很危险的,别人在使用的时候,是不知道要不要加锁的,如果是并发场景,这种写法是很危险的。我做练习的时候,就是这样写的,事实上这是不严谨和规范的。

1
2
3
4
5
6
7
8
package ctx
import "go/syncdemo"
func My() {
	// 希望的写法,加锁
	syncdemo.Public.Lock()
	defer syncdemo.Public.Unlock()
	syncdemo.PublicResource["key1"] = "v1"
}
写法2~包级变量

写法二: 不推荐,勉强可以, 这种写法声明了两个私有的变量

1
2
3
// 很多中间件这样写
var privateResource map[string]string
var privateLock sync.Mutex

然后在里面人家自己加了锁,这样相对上面一种来说相对安全。

1
2
3
4
5
func NewFeature() {
	privateLock.Lock()
	defer privateLock.Unlock()
	privateResource["k2"] = "v2"
}
写法3~结构体

第三种安全写法(最规范,推荐写法。不直接访问字段,而是调用它提供的方法),也是约定俗成的方法。它定义了一个结构体和暴露给外部方法,外部使用的时候知需要调用这个方法就可以了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type safeResource struct {
	resource map[string]string
	lock     sync.RWMutex
}

func (s *safeResource) Add(key string, value string) {
	s.lock.Lock()
	defer s.lock.Unlock()
	s.resource[key] = value
}

func TestMu(t *testing.T) {
	var s safeResource
	s.resource = map[string]string{}
	s.Add("name", "Jack")
	fmt.Println(s.resource)
}

func TestMu2(t *testing.T) {
	ss := safeResource{
		resource: map[string]string{}, // 一定要初始化,不然会报错
		lock:     sync.RWMutex{},
	}
	ss.Add("key", "value")
	fmt.Println(ss.resource)
}

2. 不要拿着读锁去加写锁

看看下面代码的问题在哪
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type SafeMap[K comparable, V any] struct {
	values map[K]V
	lock   sync.RWMutex
}

func (s *SafeMap[K, V]) LoadOrstoreV1(key K, newValue V) (V, bool) {
	s.lock.RLock()
	oldVal, ok := s.values[key]
	defer s.lock.RUnlock()
	if ok {
		return oldVal, true
	}
	// 加写锁
	s.lock.Lock()
	defer s.lock.Unlock()

	//double check
	oldVal, ok = s.values[key]
	if ok {
		return oldVal, true
	}
	s.values[key] = newValue
	return newValue, false
}


func TestDeferRLock(t *testing.T) {
	sm := SafeMap[string, string]{
		values: make(map[string]string, 4),
	}
	sm.LoadOrstoreV1("a", "b")
	fmt.Println(sm.values)
}

运行上面代码,直接死锁了。

1
fatal error: all goroutines are asleep - deadlock!

上面代码的问题,在于读锁没有释放,有了defer后,最后才释放这个读锁,我们拿着读锁,加写锁,写数据。事实上,加写锁的时候就已经死锁了,是加不上的。可以在加写锁前后打印一下。

3. double check

看看下面代码的问题在哪
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
var wg sync.WaitGroup

func (s *SafeMap[K, V]) LoadOrstoreV2(key K, newValue V) (V, bool) {
	s.lock.RLock()
	oldVal, ok := s.values[key]
	s.lock.RUnlock()
	if ok {
		fmt.Printf("已经有了哦%v\n", s.values)
		return oldVal, true
	}
	time.Sleep(100 * time.Millisecond)

	s.lock.Lock()
	defer s.lock.Unlock()
	s.values[key] = newValue
	fmt.Printf("新的k-v已经放进去啦……%v\n", s.values)
	return newValue, false
}

func TestWithoutDouble(t *testing.T) {
	sm := SafeMap[string, int]{
		values: make(map[string]int, 4),
	}
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			time.Sleep(time.Second)
			sm.LoadOrstoreV2("a", i)
		}(i)
	}
	time.Sleep(3 * time.Second)
	wg.Wait()
	fmt.Println(sm)
}

上面代码很值得分析,在测试程序中,我们开了10个goroutine。

而且这10个goroutine,由于我特意加了延时,会停在那一段时间,就是还没往里面放数据。并且第一个check是走不到那里的。一开始所有的goroutine返回的都是false,当第一个goroutine进行到下面时,给赋值了。这个时候没有第二次check的时候,另外的goroutine会直接修改掉原来的值,接下来的goroutine也会覆盖掉上一次的值。所以,我们看到了下面的结果

/并发编程/20230413174948.png

有doublecheck的场景
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (s *SafeMap[K, V]) LoadOrstoreV3(key K, newValue V) (V, bool) {
	s.lock.RLock()
	oldVal, ok := s.values[key]
	s.lock.RUnlock()
	if ok {
		fmt.Printf("第一次check已经有了哦%v\n", s.values)
		return oldVal, true
	}
	time.Sleep(100 * time.Millisecond)

	s.lock.Lock()
	defer s.lock.Unlock()
	oldVal, ok = s.values[key]
	if ok {
		fmt.Printf("第二次check已经有了哦%v\n", s.values)
		return oldVal, true
	}
	s.values[key] = newValue
	fmt.Printf("新的key已经放进去啦……%v\n", s.values)
	return newValue, false
}

func TestDoubleCheck(t *testing.T) {
	sm := SafeMap[string, int]{
		values: make(map[string]int, 4),
	}
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			time.Sleep(time.Second)
			sm.LoadOrstoreV3("a", i)
		}(i)
	}
	time.Sleep(3 * time.Second)
	wg.Wait()
	fmt.Println(sm)
}

/并发编程/20230413175606.png

可以看到,加了doublecheck后,就不会被覆盖了。

在实际编码中,尤其是这种并发场景下,涉及到锁等问题,尽可能规范写法,可以避免很多坑。上面写法总结如下, 使用 RWMutex 实现 double-check:

  • 加读锁先检查一遍
  • 释放读锁
  • 加写锁
  • 再检查一遍

4. Sync.Once

sync.Once 用来确保某个动作至多执行一 次。普遍用于初始化资源,单例模式。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
type OnceClose struct {
	close sync.Once
}

func (o *OnceClose) Close() error {
	o.close.Do(func() {
		fmt.Println("close")
	})
	return nil
}

func TestOnceClose(t *testing.T) {
	oc := &OnceClose{}
	for i := 0; i < 10; i++ {
		fmt.Println(i)
		oc.Close()
	}
}

可以看到,虽然for循环走了十次,但只打了一次close。如果我们再把方法的recevier换成值类型的receiver呢,我们来看一下结果:

1
2
3
4
5
6
func (no OnceClose) Close1() error {
	no.close.Do(func() {
		fmt.Println("Close")
	})
	return nil
}

/并发编程/20230413203133.png

分析

这个时候就不一样了,真的打印了十次,那么这又是为什么呢?其实Sync.Once的实现原理很简单,它相当于设计了一个标志位,执行过了,就改为done, 后面判断这个标志位就可以了。如果我们使用值传递方式,改的是副本的值,而原来的并没有修改。这里要强调一句话:调用方法的时候本质上是副本,一个是值的副本,一个是指针的副本。是不存在像其他语言里那种引用传递的概念的。

另外还有一种保证执行一次的方法init() 方法,一般来说,使用init()进行包级别的初始化,而且不延迟初始化。如果需要使用延迟初始化,则还是需要使用Sync.Once方法。

不知道Sync.Once是啥,点进去看就明白了。注释文档写的很详细,里面就一个方法。

5. Sync.Pool

一般情况下,如果要考虑缓存资源,比如说创建好的对象,那么可以使用sync.Pool。

  • sync.Pool会先查看自己是否有资源,有则直接返回,没有则创建一个新的
  • sync.Pool会在GC的时候释放缓存的资源

一般是用sync.Pool都是为了复用内存:

  • 它减少了内存分配,也减轻了GC压力(最主要)
  • 少消耗CPU资源(内存分配和GC都是CPU密集操作)

5.1 简单案例

简单示例

首先我们来看看这个 sync.Pool 是如何使用的,非常的简单。它一共只有三个方法我们需要知道的:New、Put、Get

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func TestPooldemo(t *testing.T) {
	var strPool = sync.Pool{
		New: func() interface{} {
			return "test str"
		},
	}
	str := strPool.Get()
	fmt.Println(str)
	strPool.Put(str)
}
  • 通过New去定义你这个池子里面放的究竟是什么东西,在这个池子里面你只能放一种类型的东西。比如在上面的例子中我就在池子里面放了字符串。
  • 我们随时可以通过Get方法从池子里面获取我们之前在New里面定义类型的数据。
  • 当我们用完了之后可以通过Put方法放回去,或者放别的同类型的数据进去。

5.2 pool示例

示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func TestPool(t *testing.T) {
	pool := sync.Pool{
		New: func() any {
			fmt.Println("hhhh,new")
			return []byte{}
		},
	}
	for i := 0; i < 5; i++ {
		val := pool.Get()
		fmt.Println(val)
		pool.Put(val)
	}
}

上面的程序输出如下

1
2
3
4
5
6
7
hhhh,new
[]
-------------------
[]
[]
[]
[]

我们来分析一下,

  • 第一次for循环,我们get的时候发现,pool里面是没有的,于是,执行了new函数,并且返回了一个空的字节切片。
  • 后面的因为get到里面的空切片,没有走new这边。
修改示例

从上面可以看出,如果里面有值,是不会走到new这边,而是直接从pool里取值了。下面修改一下代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func TestPool(t *testing.T) {
	pool := sync.Pool{
		New: func() any {
			fmt.Println("hhhh,new")
			return []byte{}
		},
	}
	for i := 0; i < 5; i++ {
		val := pool.Get()
		fmt.Println(val)
		//pool.Put(val)
	}
}

输出

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
hhhh,new
[]
hhhh,new
[]
hhhh,new
[]
hhhh,new
[]
hhhh,new
[]

输出了5次,我们第一次取值出来用后,没有放进去,说明没有值的时候,走到New这边了。

再次验证
1
2
3
4
5
6
for i := 0; i < 5; i++ {
		val := pool.Get()
		fmt.Println(val)
		//pool.Put(val)
		pool.Put(i)
	}

输出

1
2
3
4
5
6
hhhh,new
[]
0
1
2
3

可以看到,当我们第一次取出后,继续放值到pool中,就没有走到new,取的是上一次放进去的值。

  • 有,就直接给你已经有的
  • 没有,就走new,创建新的。

5.3 重置数据

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type User struct {
	ID   int
	Name string
}

func (u *User) Reset() {
	u.ID = 0
	u.Name = ""
}

func TestPool2(t *testing.T) {
	pool := sync.Pool{
		New: func() any {
			return &User{}
		},
	}
	u1 := pool.Get().(*User)
	u1.ID = 12
	u1.Name = "TOM"
	u1.Reset()
	pool.Put(u1)
	u2 := pool.Get().(*User)
	fmt.Println(u2)
}

重置pool的数据,是很常见的操作,上面代码的重置操作,在将u1拿出来后,操作完成了并没有直接放进pool中,而是将u1重置了。一开始我并不明白这样做是为什么,把数据清理了再放回去,放一个空数据回去有什么意义?下一次get的是空数据。

有这个疑问,说明我没有理解pool做出来的根本目的是什么?是为了避免频繁gc,每调用一次函数都声明一个buffer的话,会申请一段空间,函数退出之后就会被标记为垃圾,但是高并发下,会导致内存过大,内存回收不及时。所以我们就不要每次都申请一个buffer,而是取上次用过的buffer清空之后,复用。它可能还会gc掉里面的buffer,更不应该用它存数据。

/并发编程/20230414103837.png

所以到这里,我理解错了,pool设计的初衷,是缓存和复用,缓解GC压力。

5.4 利用pool实现buffer池

Pool还是比较简单的,大多数情况下都可以直接使用。但是在一些场景下,我们需要更加精细的控制,那么就会尝试自己封装一下Pool。