1. 锁简介
Mutex
可以看做是锁,而 RWMutex
则是读写锁。 一般的用法是将 Mutex 或者 RWMutex 和需要被保住的资源封装在一个结构体内。
- 如果有多个 goroutine 同时读写的资源,就一定要保护起来。
- 如果多个 goroutine 只读某个资源,那就不需要保护。
1.1 锁分类
使用锁的时候,优先使用 RWMutex。
- RWMutex:核心就是四个方法,RLock(加读锁)、 RUnlock(释放读锁)、Lock(加写锁)、Unlock(释放写锁)
- Mutex:Lock 和 Unlock(互斥锁)
互斥锁:
读都不能一起读,读的时候不能写,写的时候不能读。这是最普通的互斥锁。
只读锁:
如果加的是读锁,是不是同一个时间是不能两个goroutine同时读呢,答案是可以的,因为读锁是共享锁
。但是读的时候是不能写的,所以叫只读锁,但是其他goroutine却是可以去读的。
写锁
: 写锁是独占锁
,写时不能读,读写锁是写优先的锁,基本就是写要占锁时,就算没有占成功,所有的新来的读锁全部都得靠边站,等待旧读锁释放,写锁锁定。
1.2 三种写法比较优劣
写法1~全局变量
写法一: 看下面代码声明,下面代码中,声明了两个公共的包级变量
1
2
3
4
| package syncdemo
import "sync"
var PublicResource map[string]string
var Public sync.RWMutex
|
在另外一个包中,如下代码,直接使用了这个map类型的包及变量,这个是很危险的,别人在使用的时候,是不知道要不要加锁的,如果是并发场景,这种写法是很危险的。我做练习的时候,就是这样写的,事实上这是不严谨和规范的。
1
2
3
4
5
6
7
8
| package ctx
import "go/syncdemo"
func My() {
// 希望的写法,加锁
syncdemo.Public.Lock()
defer syncdemo.Public.Unlock()
syncdemo.PublicResource["key1"] = "v1"
}
|
写法2~包级变量
写法二: 不推荐,勉强可以, 这种写法声明了两个私有的变量
1
2
3
| // 很多中间件这样写
var privateResource map[string]string
var privateLock sync.Mutex
|
然后在里面人家自己加了锁,这样相对上面一种来说相对安全。
1
2
3
4
5
| func NewFeature() {
privateLock.Lock()
defer privateLock.Unlock()
privateResource["k2"] = "v2"
}
|
写法3~结构体
第三种安全写法(最规范,推荐写法。不直接访问字段,而是调用它提供的方法),也是约定俗成的方法。它定义了一个结构体和暴露给外部方法,外部使用的时候知需要调用这个方法就可以了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| type safeResource struct {
resource map[string]string
lock sync.RWMutex
}
func (s *safeResource) Add(key string, value string) {
s.lock.Lock()
defer s.lock.Unlock()
s.resource[key] = value
}
func TestMu(t *testing.T) {
var s safeResource
s.resource = map[string]string{}
s.Add("name", "Jack")
fmt.Println(s.resource)
}
func TestMu2(t *testing.T) {
ss := safeResource{
resource: map[string]string{}, // 一定要初始化,不然会报错
lock: sync.RWMutex{},
}
ss.Add("key", "value")
fmt.Println(ss.resource)
}
|
2. 不要拿着读锁去加写锁
看看下面代码的问题在哪
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| type SafeMap[K comparable, V any] struct {
values map[K]V
lock sync.RWMutex
}
func (s *SafeMap[K, V]) LoadOrstoreV1(key K, newValue V) (V, bool) {
s.lock.RLock()
oldVal, ok := s.values[key]
defer s.lock.RUnlock()
if ok {
return oldVal, true
}
// 加写锁
s.lock.Lock()
defer s.lock.Unlock()
//double check
oldVal, ok = s.values[key]
if ok {
return oldVal, true
}
s.values[key] = newValue
return newValue, false
}
func TestDeferRLock(t *testing.T) {
sm := SafeMap[string, string]{
values: make(map[string]string, 4),
}
sm.LoadOrstoreV1("a", "b")
fmt.Println(sm.values)
}
|
运行上面代码,直接死锁了。
1
| fatal error: all goroutines are asleep - deadlock!
|
上面代码的问题,在于读锁没有释放,有了defer后,最后才释放这个读锁,我们拿着读锁,加写锁,写数据。事实上,加写锁的时候就已经死锁了,是加不上的。可以在加写锁前后打印一下。
3. double check
看看下面代码的问题在哪
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| var wg sync.WaitGroup
func (s *SafeMap[K, V]) LoadOrstoreV2(key K, newValue V) (V, bool) {
s.lock.RLock()
oldVal, ok := s.values[key]
s.lock.RUnlock()
if ok {
fmt.Printf("已经有了哦%v\n", s.values)
return oldVal, true
}
time.Sleep(100 * time.Millisecond)
s.lock.Lock()
defer s.lock.Unlock()
s.values[key] = newValue
fmt.Printf("新的k-v已经放进去啦……%v\n", s.values)
return newValue, false
}
func TestWithoutDouble(t *testing.T) {
sm := SafeMap[string, int]{
values: make(map[string]int, 4),
}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
time.Sleep(time.Second)
sm.LoadOrstoreV2("a", i)
}(i)
}
time.Sleep(3 * time.Second)
wg.Wait()
fmt.Println(sm)
}
|
上面代码很值得分析,在测试程序中,我们开了10个goroutine。
而且这10个goroutine,由于我特意加了延时,会停在那一段时间,就是还没往里面放数据。并且第一个check是走不到那里的。一开始所有的goroutine返回的都是false,当第一个goroutine进行到下面时,给赋值了。这个时候没有第二次check的时候,另外的goroutine会直接修改掉原来的值,接下来的goroutine也会覆盖掉上一次的值。所以,我们看到了下面的结果
有doublecheck的场景
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| func (s *SafeMap[K, V]) LoadOrstoreV3(key K, newValue V) (V, bool) {
s.lock.RLock()
oldVal, ok := s.values[key]
s.lock.RUnlock()
if ok {
fmt.Printf("第一次check已经有了哦%v\n", s.values)
return oldVal, true
}
time.Sleep(100 * time.Millisecond)
s.lock.Lock()
defer s.lock.Unlock()
oldVal, ok = s.values[key]
if ok {
fmt.Printf("第二次check已经有了哦%v\n", s.values)
return oldVal, true
}
s.values[key] = newValue
fmt.Printf("新的key已经放进去啦……%v\n", s.values)
return newValue, false
}
func TestDoubleCheck(t *testing.T) {
sm := SafeMap[string, int]{
values: make(map[string]int, 4),
}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
time.Sleep(time.Second)
sm.LoadOrstoreV3("a", i)
}(i)
}
time.Sleep(3 * time.Second)
wg.Wait()
fmt.Println(sm)
}
|
可以看到,加了doublecheck后,就不会被覆盖了。
在实际编码中,尤其是这种并发场景下,涉及到锁等问题,尽可能规范写法,可以避免很多坑。上面写法总结如下, 使用 RWMutex 实现 double-check:
4. Sync.Once
sync.Once 用来确保某个动作至多执行一 次。普遍用于初始化资源,单例模式。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| type OnceClose struct {
close sync.Once
}
func (o *OnceClose) Close() error {
o.close.Do(func() {
fmt.Println("close")
})
return nil
}
func TestOnceClose(t *testing.T) {
oc := &OnceClose{}
for i := 0; i < 10; i++ {
fmt.Println(i)
oc.Close()
}
}
|
可以看到,虽然for循环走了十次,但只打了一次close。如果我们再把方法的recevier换成值类型的receiver呢,我们来看一下结果:
1
2
3
4
5
6
| func (no OnceClose) Close1() error {
no.close.Do(func() {
fmt.Println("Close")
})
return nil
}
|
分析
这个时候就不一样了,真的打印了十次,那么这又是为什么呢?其实Sync.Once的实现原理很简单,它相当于设计了一个标志位,执行过了,就改为done, 后面判断这个标志位就可以了。如果我们使用值传递方式,改的是副本的值,而原来的并没有修改。这里要强调一句话:调用方法的时候本质上是副本,一个是值的副本,一个是指针的副本。是不存在像其他语言里那种引用传递的概念的。
另外还有一种保证执行一次的方法init()
方法,一般来说,使用init()进行包级别的初始化,而且不延迟初始化。如果需要使用延迟初始化,则还是需要使用Sync.Once
方法。
不知道Sync.Once
是啥,点进去看就明白了。注释文档写的很详细,里面就一个方法。
5. Sync.Pool
一般情况下,如果要考虑缓存资源,比如说创建好的对象,那么可以使用sync.Pool。
- sync.Pool会先查看自己是否有资源,有则直接返回,没有则创建一个新的
- sync.Pool会在GC的时候释放缓存的资源
一般是用sync.Pool都是为了复用内存:
- 它减少了内存分配,也减轻了GC压力(最主要)
- 少消耗CPU资源(内存分配和GC都是CPU密集操作)
5.1 简单案例
简单示例
首先我们来看看这个 sync.Pool 是如何使用的,非常的简单。它一共只有三个方法我们需要知道的:New、Put、Get
1
2
3
4
5
6
7
8
9
10
| func TestPooldemo(t *testing.T) {
var strPool = sync.Pool{
New: func() interface{} {
return "test str"
},
}
str := strPool.Get()
fmt.Println(str)
strPool.Put(str)
}
|
- 通过
New
去定义你这个池子里面放的究竟是什么东西,在这个池子里面你只能放一种类型的东西。比如在上面的例子中我就在池子里面放了字符串。 - 我们随时可以通过
Get
方法从池子里面获取我们之前在New里面定义类型的数据。 - 当我们用完了之后可以通过
Put
方法放回去,或者放别的同类型的数据进去。
5.2 pool示例
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
| func TestPool(t *testing.T) {
pool := sync.Pool{
New: func() any {
fmt.Println("hhhh,new")
return []byte{}
},
}
for i := 0; i < 5; i++ {
val := pool.Get()
fmt.Println(val)
pool.Put(val)
}
}
|
上面的程序输出如下
1
2
3
4
5
6
7
| hhhh,new
[]
-------------------
[]
[]
[]
[]
|
我们来分析一下,
- 第一次for循环,我们get的时候发现,pool里面是没有的,于是,执行了new函数,并且返回了一个空的字节切片。
- 后面的因为get到里面的空切片,没有走new这边。
修改示例
从上面可以看出,如果里面有值,是不会走到new这边,而是直接从pool里取值了。下面修改一下代码
1
2
3
4
5
6
7
8
9
10
11
12
13
| func TestPool(t *testing.T) {
pool := sync.Pool{
New: func() any {
fmt.Println("hhhh,new")
return []byte{}
},
}
for i := 0; i < 5; i++ {
val := pool.Get()
fmt.Println(val)
//pool.Put(val)
}
}
|
输出
1
2
3
4
5
6
7
8
9
10
| hhhh,new
[]
hhhh,new
[]
hhhh,new
[]
hhhh,new
[]
hhhh,new
[]
|
输出了5次,我们第一次取值出来用后,没有放进去,说明没有值的时候,走到New这边了。
再次验证
1
2
3
4
5
6
| for i := 0; i < 5; i++ {
val := pool.Get()
fmt.Println(val)
//pool.Put(val)
pool.Put(i)
}
|
输出
1
2
3
4
5
6
| hhhh,new
[]
0
1
2
3
|
可以看到,当我们第一次取出后,继续放值到pool中,就没有走到new,取的是上一次放进去的值。
- 有,就直接给你已经有的
- 没有,就走new,创建新的。
5.3 重置数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| type User struct {
ID int
Name string
}
func (u *User) Reset() {
u.ID = 0
u.Name = ""
}
func TestPool2(t *testing.T) {
pool := sync.Pool{
New: func() any {
return &User{}
},
}
u1 := pool.Get().(*User)
u1.ID = 12
u1.Name = "TOM"
u1.Reset()
pool.Put(u1)
u2 := pool.Get().(*User)
fmt.Println(u2)
}
|
重置pool的数据,是很常见的操作,上面代码的重置操作,在将u1拿出来后,操作完成了并没有直接放进pool中,而是将u1重置了。一开始我并不明白这样做是为什么,把数据清理了再放回去,放一个空数据回去有什么意义?下一次get的是空数据。
有这个疑问,说明我没有理解pool做出来的根本目的是什么?是为了避免频繁gc,每调用一次函数都声明一个buffer的话,会申请一段空间,函数退出之后就会被标记为垃圾,但是高并发下,会导致内存过大,内存回收不及时。所以我们就不要每次都申请一个buffer,而是取上次用过的buffer清空之后
,复用。它可能还会gc掉里面的buffer,更不应该用它存数据。
所以到这里,我理解错了,pool设计的初衷,是缓存和复用,缓解GC压力。
5.4 利用pool实现buffer池
Pool还是比较简单的,大多数情况下都可以直接使用。但是在一些场景下,我们需要更加精细的控制,那么就会尝试自己封装一下Pool。