目录

并发编程基础篇

并发编程
并发编程在当前软件领域是一个非常重要的概念,随着CPU等硬件的发展,我们无一例外的想让我们的程序运行的快一点、再快一点。Go语言在语言层面天生支持并发,充分利用现代CPU的多核优势,这也是Go语言能够大范围流行的一个很重要的原因。这一篇是我在学习老男孩B站视频及博客做的笔记,只能算是入门,后面会有更加深入的讨论

1. 基本概念

首先我们先来了解几个与并发编程相关的基本概念。

1.1 串行、并发与并行

  • 串行:我们都是先读小学,小学毕业后再读初中,读完初中再读高中。
  • 并发:同一时间段内执行多个任务(你在用微信和两个女朋友聊天)。
  • 并行:同一时刻执行多个任务(你和你朋友都在用微信和女朋友聊天)。

1.2 进程、线程和协程

  • 进程(process):程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。
  • 线程(thread):操作系统基于进程开启的轻量级进程,是操作系统调度执行的最小单位。
  • 协程(coroutine):非操作系统提供而是由用户自行创建和控制的用户态‘线程’,比线程更轻量级。

1.3 并发模型

业界将如何实现并发编程总结归纳为各式各样的并发模型,常见的并发模型有以下几种:

  • 线程&锁模型
  • Actor模型
  • CSP模型
  • Fork&Join模型

Go语言中的并发程序主要是通过基于CSP(communicating sequential processes)的goroutine和channel来实现,当然也支持使用传统的多线程共享内存的并发方式。

关于CSP,后面会有文章来更深入介绍

2. goroutine

Goroutine 是 Go 语言支持并发的核心,在一个Go程序中同时创建成百上千个goroutine是非常普遍的,一个goroutine会以一个很小的栈开始其生命周期,一般只需要2KB。区别于操作系统线程由系统内核进行调度, goroutine 是由Go运行时(runtime)负责调度。例如Go运行时会智能地将 m个goroutine 合理地分配给n个操作系统线程,实现类似m:n的调度机制,不再需要Go开发者自行在代码层面维护一个线程池。

Goroutine 是 Go 程序中最基本的并发执行单元。每一个 Go 程序都至少包含一个 goroutine——main goroutine,当 Go 程序启动时它会自动创建。

在Go语言编程中你不需要去自己写进程、线程、协程,你的技能包里只有一个技能-goroutine,当你需要让某个任务并发执行的时候,你只需要把这个任务包装成一个函数,开启一个 goroutine 去执行这个函数就可以了,就是这么简单粗暴。

Go语言中使用 goroutine 非常简单,只需要在函数或方法调用前加上go关键字就可以创建一个 goroutine ,从而让该函数或方法在新创建的 goroutine 中执行。

1
2
3
4
5
6
7
8
9

func hello() {
	fmt.Println("hello")

}
func TestBin1(t *testing.T) {
	go hello()
	fmt.Println("main goroutine over")
}

在 Go 程序启动时,Go 程序会为 main 函数创建一个默认的 goroutine 。在上面的代码中在 main 函数中使用 go 关键字创建了另外一个 goroutine 去执行 hello 函数,而此时 main goroutine 还在继续往下执行,程序中此时存在两个并发执行的 goroutine。当 main 函数结束时整个程序也就结束了,同时 main goroutine 也结束了,所有由 main goroutine 创建的 goroutine 也会一同退出。也就是说我们的 main 函数退出太快,另外一个 goroutine 中的函数还未执行完程序就退出了,导致未打印出“hello”。

输出如下结果,咦,hello函数中的打印怎么没有输出呢?

1
main goroutine over

在 Go 程序启动时,Go 程序会为 main 函数创建一个默认的 goroutine 。在上面的代码中在 main 函数中使用 go 关键字创建了另外一个 goroutine 去执行 hello 函数,而此时 main goroutine 还在继续往下执行,程序中此时存在两个并发执行的 goroutine。当 main 函数结束时整个程序也就结束了,同时 main goroutine 也结束了,所有由 main goroutine 创建的 goroutine 也会一同退出。也就是说我们的 main 函数退出太快,另外一个 goroutine 中的函数还未执行完程序就退出了,导致未打印出“hello”。

所以我们要想办法让 main 函数‘“等一等”将在另一个 goroutine 中运行的 hello 函数。其中最简单粗暴的方式就是在 main 函数中“time.Sleep”一秒钟了(这里的1秒钟只是我们为了保证新的 goroutine 能够被正常创建和执行而设置的一个值)。

按如下方式修改我们的示例代码。

1
2
3
4
5
func TestBin1(t *testing.T) {
	go hello()
	fmt.Println("main goroutine over")
	time.Sleep(time.Second)
}

此时输出结果

1
2
main goroutine over
hello

为什么会先打印main中的呢呢?这是因为在程序中创建 goroutine 执行函数需要一定的开销,而与此同时 main 函数所在的 goroutine 是继续执行的。

/并发编程/20230406183622.png

猜猜下面程序会打出什么
1
2
3
4
5
6
7
func main() {
	for i := 0; i < 5; i++ {
		go func() {
			fmt.Println(i)
		}()
	}
}

2.1 goroutine示例

批量开goroutine
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func hello2(i int) {
	fmt.Println("hello", i)
}

func TestBin2(t *testing.T) {
    // 批量开启goroutine
	for i := 0; i <= 100000; i++ {
		go hello2(i)
	}
	fmt.Println("main")
	time.Sleep(time.Second)
}

这个程序虽然看起来数据很大,100000条,几乎不到6s中就运行完成了(电脑cpu性能越好,速度越快)。这就是并发的好处,也是Go的优势所在,如果是利用Python中的并发,是远远不可能有这个速度的。

2.2 goroutine与闭包

看看下面程序问题在哪?
1
2
3
4
5
6
7
func TestBin3(t *testing.T) {
	for i := 0; i < 100; i++ {
		go func() {
			fmt.Println(i)
		}()
	}
}

/并发编程/20230406185504.png /并发编程/20230406220046.png

上面的程序依然有两个问题

  • 没有全部输出, 当然,这是因为太快,没来得及输出,如果把for循环次数改小,甚至会一个输出都看不到,main的goroutine太快,在最后加一个延时就可以解决;
  • 重复数据,比如我设置for循环100,打印很多100;设置10,打印出重复10等。
分析与修改

为什么会有重复的数据呢?匿名函数没有加参数时,i是共享的,是公共变量。比如我开启第一个协程,for循环继续了,i就变了。那么这个时候这个已经开启的goroutine打出来的不知道是已经循环到多少次的i了。所以才会重复。

它用的还是for循环的i,这个匿名函数的goroutine就开始了。当我们加了参数后,来看看上面问题解决了没有。

1
2
3
4
5
6
7
func TestBin4(t *testing.T) {
	for i := 0; i < 100; i++ {
		go func(i int) {
			fmt.Println(i)
		}(i)
	}
}
总结
加参数的协程没有同享main函数的i变量,go的函数参数是值拷贝,加了参数会拷贝一个副本,不加参数的都是用的i本身,每次循环i的地址都是一样的,导致最后都是一样的结果。当我们传参后,它已经传进去了,它不管外面的怎么变,它还是会打出传进去的那个值。现在理解上面重复打印的问题了吧。

2.3 sync.WaitGroup

上面两个程序是有问题的,第一个停顿明显,上面的程序运行时,打印完成后明显有一个停顿,第二个输出有重复,2.1 章节的例子循环次数改成100时,看的更清楚,会有重复的。现在我们来利用sync.WaitGroup去解决第一个问题

Go 语言中通过sync包为我们提供了一些常用的并发原语, sync 包中的WaitGroup,当你并不关心并发操作的结果或者有其它方式收集并发操作的结果时,WaitGroup是实现等待一组并发操作完成的好方法。

方法名功能
func (wg * WaitGroup) Add(delta int)计数器+delta,一般放在main里面线程开始前的位置+1
(wg *WaitGroup) Done()计数器-1 ,放在线程函数末尾-1
(wg *WaitGroup) Wait()阻塞直到计数器变为0,放在执行go 函数后面0
原理

sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。例如当我们启动了 N 个并发任务时,就将计数器值增加N。每个任务完成时通过调用 Done 方法将计数器减1。通过调用 Wait 来等待并发任务执行完,当计数器值为 0 时,表示所有并发任务已经完成。

下面的示例代码中我们在 main goroutine 中使用sync.WaitGroup来等待 hello goroutine 完成后再退出。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
var wg sync.WaitGroup

func hello3() {
	fmt.Println("hello")
	wg.Done() // 告知当前goroutine完成
}

func TestWait(t *testing.T) {
	wg.Add(1)
	go hello3()
	fmt.Println("main:hello")
	wg.Wait() //阻塞等待登记的goroutine完成
}

可以看到,这个就比最初使用time.sleep() 延时时顺畅许多。将代码编译后再执行,得到的输出结果和之前一致,但这一次程序不再有多余的停顿,hello goroutine 执行完毕后程序直接退出。

2.4 启动多个goroutine(重点)

在 Go 语言中实现并发就是这样简单,我们还可以启动多个 goroutine 。让我们再来看一个新的代码示例。这里同样使用了sync.WaitGroup来实现 goroutine 的同步。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func helloPra(i int) {
	defer wg.Done()
	fmt.Println("hello", i)
}

func TestWait1(t *testing.T) {
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go helloPra(i)
	}
	wg.Wait()
}

多次执行上面的代码会发现每次终端上打印数字的顺序都不一致。这是因为10个 goroutine 是并发执行的,而 goroutine 的调度是随机的。

2.5 GOMAXPROCS

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func a() {
	defer wg.Done()
	for i := 0; i < 100; i++ {
		time.Sleep(time.Nanosecond)
		fmt.Printf("A:%d\n", i)
	}
}

func b() {
	defer wg.Done()
	for i := 0; i < 100; i++ {
		time.Sleep(time.Nanosecond)
		fmt.Printf("B:%d\n", i)
	}
}

func TestGoMax(t *testing.T) {
	runtime.GOMAXPROCS(6)
	wg.Add(2)
	go a()
	go b()
	wg.Wait()
}
分析
  • 如果不加纳秒的延时,在100这个数据量下,是a线程执行完了,再执行b线程,或者是先b后a,是观察不到a,b交替执行的。因为太快了,线程a开启后,开启线程b的时候,a有可能执行完成,看起来是ab有先后顺序。事实上,数据量大的时候即便不加延时也可看出来。

  • 加了纳秒的延时是可以看到的两个线程交替输出的,同一个时刻,这两个线程是都在跑的。而不是a跑完了等b,也不是b跑完了等a

  • 可以把两个for循环的数据量放大到10w,也是可以看到交替打印的。这是视频中演示的一个例子,视频里却说在mac或者Linux下演示有效果,在windows下没有效果,于是我编译了一下扔到Linux去跑,发现还是这样,也就是说,视频里讲错了,他讲的是不对的。能发现错误,说明自己的水平在一点点提高。观察不到效果还是因为Go真的太快了!数据量大到一定程度或者是加入纳秒的延时就可以看出来了。

3. channel

单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。

虽然可以使用共享内存进行数据交换,但是共享内存在不同的 goroutine 中容易发生竞态问题。为了保证数据交换的正确性,很多并发模型中必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。

Go语言采用的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信

如果说 goroutine 是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个 goroutine 发送特定值到另一个 goroutine 的通信机制。

Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

3.1 channel类型的声明

channel是 Go 语言中一种特有的类型。声明通道类型变量的格式如下:

chan的声明
1
var 变量名称 chan 元素类型

其中:

  • chan:是关键字
  • 元素类型:是指通道中传递元素的类型

例如:

1
2
3
var ch1 chan int   // 声明一个传递整型的通道
var ch2 chan bool  // 声明一个传递布尔型的通道
var ch3 chan []int // 声明一个传递int切片的通道

3.2 channel的初始化

初始值

未初始化的通道类型变量其默认零值是nil

1
2
3
4
func TestChan1(t *testing.T) {
	var ch chan int
	fmt.Printf("%T %v\n", ch, ch) // chan int  nil
}
初始化

声明的通道类型变量需要使用内置的make函数初始化之后才能使用。具体格式如下:

1
make(chan 元素类型, [缓冲大小])

其中:

  • channel的缓冲大小是可选的。

例如:

1
2
ch4 := make(chan int)
ch5 := make(chan bool, 1)  // 声明一个缓冲区大小为1的通道

到目前,make初始化切片、map、channel都已经学完了,这些内容很重要,后面复习一下,深入理解。channel初始化后才能获得内存地址。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func TestChan2(t *testing.T) {
	var ch chan int
	fmt.Println(ch)   //nil

	ch = make(chan int, 1)
	fmt.Println(ch)   // 0xc00001a230
	
	//fmt.Println(<-ch) 死锁,从nil的chan里读数据会死锁
	ch <- 10
	fmt.Println(ch)  // 0xc00001a230
}

3.3 channel 操作

通道共有发送(send)接收(receive)关闭(close)三种操作。而发送和接收操作都使用<-符号。现在使用以下语句定义一个通道:

1
ch := make(chan int)

发送 : 将一个值发送到通道中:

1
ch <- 10 // 把10发送到ch中

接收 : 从一个通道中接收值

1
2
x := <- ch // 从ch中接收值并赋值给变量x
<-ch       // 从ch中接收值,忽略结果

关闭:我们通过调用内置的close函数来关闭通道

1
close(ch)
注意点

一个通道值是可以被垃圾回收掉的。通道通常由发送方执行关闭操作,并且只有在接收方明确等待通道关闭的信号时才需要执行关闭操作。它和关闭文件不一样,通常在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。

关闭后的通道有以下特点:

  1. 对一个关闭的通道再发送值就会导致 panic。
  2. 对一个关闭的通道进行接收会一直获取值直到通道为空。
  3. 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
  4. 关闭一个已经关闭的通道会导致 panic。

3.4 无缓冲channel

无缓冲的通道又称为阻塞的通道。我们来看一下如下代码片段。

1
2
3
4
5
func TestChan3(t *testing.T) {
	ch := make(chan int)
	ch <- 10
	fmt.Println("send")
}
报错

上面这段代码能够通过编译,但执行的时候会出现以下错误:

1
2
3
4
5
6
7
fatal error: all goroutines are asleep - deadlock! 
                                                   
goroutine 1 [chan send]:                           
main.main()                                        
        E:/go/gotest/src/practice.go:7 +0x31       

进程 已完成,退出代码为 2

deadlock表示我们程序中的 goroutine 都被挂起导致程序死锁了。为什么会出现deadlock错误呢?因为我们使用ch := make(chan int)创建的是无缓冲的通道,无缓冲的通道只有在有接收方能够接收值的时候才能发送成功,否则会一直处于等待发送的阶段。同理,如果对一个无缓冲通道执行接收操作时,没有任何向通道中发送值的操作那么也会导致接收操作阻塞。就像田径比赛中的4x100接力赛,想要完成交棒必须有一个能够接棒的运动员,否则只能等待。简单来说就是无缓冲的通道必须有至少一个接收方才能发送成功。

上面的代码会阻塞在ch <- 10这一行代码形成死锁,那如何解决这个问题呢?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func recv(c chan int) {
	ret := <-c
	fmt.Println("接收成功", ret)
}

func TestChan4(t *testing.T) {
	ch := make(chan int)
	go recv(ch)
	ch <- 10
	fmt.Println("sended")
}

/并发编程/20230407211300.png

分析
  • 首先无缓冲通道ch上的发送操作会阻塞,直到另一个 goroutine 在该通道上执行接收操作,这时数字10才能发送成功,两个 goroutine 将继续执行。

  • 相反,如果接收操作先执行,接收方所在的 goroutine 将阻塞,直到 main goroutine 中向该通道发送数字10。

  • 使用无缓冲通道进行通信将导致发送和接收的 goroutine 同步化。因此,无缓冲通道也被称为同步通道

3.5 有缓冲类型channel

还有另外一种解决上面死锁问题的方法,那就是使用有缓冲区的通道。我们可以在使用 make 函数初始化通道时,可以为其指定通道的容量,例如:

1
2
3
4
5
6
func TestChan5(t *testing.T) {
	ch := make(chan int, 1)
	ch <- 10
	fmt.Println("send")
	// ch <- 12   //dead lock
}
分析
  • 只要通道的容量大于零,那么该通道就属于有缓冲的通道,通道的容量表示通道中最大能存放的元素数量。
  • 当通道内已有元素数达到最大容量后,再向通道执行发送操作就会阻塞,除非有从通道执行接收操作。就像你小区的快递柜只有那么个多格子,格子满了就装不下了,就阻塞了,等到别人取走一个快递员就能往里面放一个。
  • 我们可以使用内置的len函数获取通道内元素的数量,使用cap函数获取通道的容量,但我们很少会这么做。

3.6 多返回值模式

当向通道中发送完数据时,我们可以通过close函数来关闭通道。当一个通道被关闭后,再往该通道发送值会引发panic,从该通道取值的操作会先取完通道中的值。通道内的值被接收完后再对通道执行接收操作得到的值会一直都是对应元素类型的零值。那我们如何判断一个通道是否被关闭了呢?

对一个通道执行接收操作时支持使用如下多返回值模式。

1
value, ok := <- ch

其中:

  • value:从通道中取出的值,如果通道被关闭则返回对应类型的零值。
  • ok:通道ch关闭时返回 false,否则返回 true。
订正

上面说法错误,查了源码解释,是不仅要关闭,且为空才是false

1
2
3
4
//	x, ok := <-c
//
// will also set ok to false for a closed and empty channel.
func close(c chan<- Type)

我测试了一下,确实是这样,两个条件同时满足才是false。为空的时候不大好测试,因为会直接deadlock。重点还是在

1
2
3
4
5
6
ch := make(chan int, 2)
ch <- 1
x, ok := <-ch
close(ch)
fmt.Println(x, ok) // true
ch <- 10 // panic
利用多返回值判断chan是否关闭

下面代码片段中的f函数会循环从通道ch中接收所有值,直到通道被关闭后退出。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func f(ch chan int) {
	for {
		v, ok := <-ch
		if !ok {
			fmt.Printf("通道已关闭")
			break
		}
		fmt.Printf("value:%#v  ok:%#v\n", v, ok)
	}
}
func TestChan21(t *testing.T) {
	ch := make(chan int, 2)
	ch <- 1
	ch <- 2
	close(ch)
	f(ch)
}

输出

1
2
3
value:1  ok:true
value:2  ok:true
通道已关闭
  • 上面程序在主程序中定义了一个有缓冲的chan,并且往里面发送了两个值后关闭了通道。
  • f函数用无限循环来从ch中读取数据。
  • 利用多返回值判断chan中还有没有数据

3.7 for range 接收值

通常我们会选择使用for range循环从通道中接收值,当通道被关闭后,会在通道内的所有值被接收完毕后会自动退出循环。上面那个示例我们使用for range改写后会很简洁。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func f2(c chan int) {
	for v := range c {
		fmt.Println(v)
	}
}

func TestChan22(t *testing.T) {
	ch := make(chan int, 2)
	ch <- 1
	ch <- 2
	close(ch)
	f2(ch)
}

3.8 单向channel

在某些场景下我们可能会将通道作为参数在多个任务函数间进行传递,通常我们会选择在不同的任务函数中对通道的使用进行限制,比如限制通道在某个函数中只能执行发送或只能执行接收操作。想象一下,我们现在有Producer和Consumer两个函数,其中Producer函数会返回一个通道,并且会持续将符合条件的数据发送至该通道,并在发送完成后将该通道关闭。而Consumer函数的任务是从通道中接收值进行计算,这两个函数之间通过Processer函数返回的通道进行通信。完整的示例代码如下。

分析
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func Producer() chan int {
	ch := make(chan int, 2)
	go func() {
		for i := 0; i < 10; i++ {
			if i%2 == 1 {
				ch <- i
			}
		}
		close(ch)
	}()
	return ch
}

func Consumer(ch chan int) int {
	sum := 0
	for v := range ch {
		sum += v
	}
	return sum
}

func TestChan31(t *testing.T) {
	ch := Producer()
	res := Consumer(ch)
	fmt.Println(res)
}

上面的demo很简单,Producer往chan里发送数据,Consumer从chan里取数据进行求和操作。

从上面的示例代码中可以看出正常情况下Consumer函数中只会对通道进行接收操作,但是这不代表不可以在Consumer函数中对通道进行发送操作。作为Producer函数的提供者,我们在返回通道的时候可能只希望调用方拿到返回的通道后只能对其进行接收操作。但是我们没有办法阻止在Consumer函数中对通道进行发送操作。

Go语言中提供了**单向通道**来处理这种需要限制通道只能进行某种操作的情况。

1
2
<- chan int // 只接收通道,只能接收不能发送
chan <- int // 只发送通道,只能发送不能接收

其中,箭头<-和关键字chan的相对位置表明了当前通道允许的操作,这种限制将在编译阶段进行检测。另外对一个只接收通道执行close也是不允许的,因为默认通道的关闭操作应该由发送方来完成。我们使用单向通道将上面的示例代码进行如下改造。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func Producer2() <-chan int {
	ch := make(chan int, 2)
	go func() {
		for i := 0; i < 10; i++ {
			if i%2 == 1 {
				ch <- i
			}
		}
		close(ch)
	}()
	return ch
}

func Consumer2(ch <-chan int) int {
	sum := 0
	for v := range ch {
		sum += v
	}
	return sum
}

func TestChan41(t *testing.T) {
	ch := Producer()
	sum := Consumer(ch)
	fmt.Println(sum)
}

这一次,Producer函数返回的是一个只接收通道,这就从代码层面限制了该函数返回的通道只能进行接收操作,保证了数据安全。很多读者看到这个示例可能会觉着这样的限制是多余的,但是试想一下如果Producer函数可以在其他地方被其他人调用,你该如何限制他人不对该通道执行发送操作呢?并且返回限制操作的单向通道也会让代码语义更清晰、更易读。

3.9 双向chan转单向chan

在函数传参及任何赋值操作中全向通道(正常通道)可以转换为单向通道,但是无法反向转换。接上面代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func TestChan42(t *testing.T) {
	var ch1 = make(chan int, 1)
	ch1 <- 10
	close(ch1)
	// 函数传参时将ch3转为单向通道
	res := Consumer2(ch1)
	fmt.Println(res)

	ch2 := make(chan int, 1)
	ch2 <- 12
	//声明一个只接收通道
	var ch3 <-chan int
	// 赋值时转为单向通道
	ch3 = ch2
	v := <-ch3
	fmt.Println(v)
}

4. select

无限循环操作channel

在某些场景下我们可能需要同时从多个通道接收数据。通道在接收数据时,如果没有数据可以被接收那么当前 goroutine 将会发生阻塞。你也许会写出如下代码尝试使用遍历的方式来实现从多个通道中接收值。

1
2
3
4
5
6
7
for{
    // 尝试从ch1接收值
    data, ok := <-ch1
    // 尝试从ch2接收值
    data, ok := <-ch2
    
}

这种方式虽然可以实现从多个通道接收值的需求,但是程序的运行性能会差很多。Go 语言内置了select关键字,使用它可以同时响应多个通道的操作。并且这样的写法会引发报错。所以不推荐这么写

select用法

Select 的使用方式类似于之前学到的 switch 语句,它也有一系列 case 分支和一个默认的分支。每个 case 分支会对应一个通道的通信(接收或发送)过程。select 会一直等待,直到其中的某个 case 的通信操作完成时,就会执行该 case 分支对应的语句。具体格式如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
select {
case <-ch1:
	//...
case data := <-ch2:
	//...
case ch3 <- 10:
	//...
default:
	//默认操作
}

Select 语句具有以下特点。

  • 可处理一个或多个 channel 的发送/接收操作。
  • 如果多个 case 同时满足,select 会随机选择一个执行。
  • 对于没有 case 的 select 会一直阻塞可用于阻塞 main 函数,防止退出
select使用示例

下面的示例代码能够在终端打印出10以内的奇数,我们借助这个代码片段来看一下 select 的具体使用。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func TestSelect(t *testing.T) {
	ch := make(chan int, 1)
	for i := 1; i < 10; i++ {
		select {
		case x := <-ch:
			fmt.Println(x)
		case ch <- i:
		}
	}
}

输出结果

1
2
3
4
5
1
3
5
7
9

示例中的代码首先是创建了一个缓冲区大小为1的通道 ch,进入 for 循环后:

  • 第一次循环时 i = 1,select 语句中包含两个 case 分支,此时由于通道中没有值可以接收,所以x := <-ch 这个 case 分支不满足,而ch <- i这个分支可以执行,会把1发送到通道中,结束本次 for 循环;
  • 第二次 for 循环时,i = 2,由于通道缓冲区已满,所以ch <- i这个分支不满足,而x := <-ch这个分支可以执行,从通道接收值1并赋值给变量 x ,所以会在终端打印出 1;
  • 后续的 for 循环以此类推会依次打印出3、5、7、9。
无限循环注意default分支退出循环

不知道循环次数,需要使用无限循环,也是很常见的写法,注意default分支要退出循环,不然会阻塞。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func TestChan23(t *testing.T) {
	ch := make(chan int, 2)
	ch <- 1
	ch <- 2
LOOP:
	for {
		select {
		case x, ok := <-ch:
			fmt.Println(x, ok)
		default:
			break LOOP
		}
	}
}
利用select阻塞程序

修改了一下上面的demo,运行这个程序,会阻塞。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func TestZuse(t *testing.T) {  
   ch := make(chan int, 1)  
   ch <- 1  
   for {  
      select {  
      case x, ok := <-ch:  
         fmt.Println(x, ok)  
      default:  
      }  
   }  
}

debug一下,看看阻塞在哪里了,第一次for循环

/并发编程/20230416212421.png

继续往下走,发现程序阻塞在了这里,无论怎么走,它始终停在了这里,这是因为它一直在等待发送数据,有值才能取出来,需要注意的是,这里的default不能缺少,否则直接deadlock

/并发编程/20230416212615.png

5. 并发安全和锁

有时候我们的代码中可能会存在多个 goroutine 同时操作一个资源(临界区)的情况,这种情况下就会发生竞态问题(数据竞态)。这就好比现实生活中十字路口被各个方向的汽车竞争,还有火车上的卫生间被车厢里的人竞争。我们用下面的代码演示一个数据竞争的示例。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
var (
	x int64
)

func add() {
	for i := 0; i < 5000; i++ {
		x = x + 1
	}
	wg.Done()
}

func TestMutex(t *testing.T) {
	wg.Add(2)
	go add()
	go add()
	wg.Wait()
	fmt.Println(x)
}

我们将上面的代码编译后执行,不出意外每次执行都会输出诸如9537、5865、6527等不同的结果。这是为什么呢?(当然如果电脑性能足够好,也是会输出10000的,我试了几次)。

在上面的示例代码片中,我们开启了两个 goroutine 分别执行 add 函数,这两个 goroutine 在访问和修改全局的x变量时就会存在数据竞争,某个 goroutine 中对全局变量x的修改可能会覆盖掉另一个 goroutine 中的操作,所以导致最后的结果与预期不符。

报错
上面的demo是我去年刚学习的时候敲的,我看了notion记录的是wg.add(1),这是一个很明显的错误。在这次整理博客的过程中,也发现自己之前敲的代码一些错误,边整理博客边复习。看看在这条路上,我能走多远。

5.1 互斥锁

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同一时间只有一个 goroutine 可以访问共享资源。Go 语言中使用sync包中提供的Mutex类型来实现互斥锁。sync.Mutex提供了两个方法供我们使用。

方法名功能
func (m *Mutex) Lock()获取互斥锁
func (m *Mutex) Unlock()释放互斥锁
互斥锁
1
2
3
m  sync.Mutex     // 互斥锁
m.Lock()          //  修改x前假锁
m.Unlock()       // 改完解锁
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func addWithLock() {
	for i := 0; i < 10000; i++ {
		m.Lock()
		x = x + 1
		m.Unlock()
	}
	//fmt.Println(x)
	wg.Done()

}
func TestMutex(t *testing.T) {
	wg.Add(2)
	go addWithLock()
	go addWithLock()
	wg.Wait()
	fmt.Println(x) // 此时
}

使用互斥锁能够保证同一时间有且只有一个 goroutine 进入临界区,其他的 goroutine 则在等待锁;当互斥锁释放后,等待的 goroutine 才可以获取锁进入临界区,多个 goroutine 同时等待一个锁时,唤醒的策略是随机的。

这样,最后的输出是不会变动的,可以分析一下,比如第一个线程执行到600,锁住了,释放完,变成601。这时不管第一个还是第二个goroutine拿到601,接着锁住,再加1,再释放,交给下一个goroutine(可能是当前goroutine,也可能是另外一个),这样,虽然是在两个goroutine里面进行,但是总的和是不变的, 如果不加锁,两个goroutine是可以同时操作x的。

5.2 读写互斥锁

互斥锁是完全互斥的,但是实际上有很多场景是读多写少的,当我们并发的去读取一个资源而不涉及资源修改的时候是没有必要加互斥锁的,这种场景下使用读写锁是更好的一种选择。读写锁在 Go 语言中使用sync包中的RWMutex类型。

读写锁分为两种:读锁和写锁。当一个 goroutine 获取到读锁之后,其他的 goroutine 如果是获取读锁会继续获得锁,如果是获取写锁就会等待;而当一个 goroutine 获取写锁之后,其他的 goroutine 无论是获取读锁还是写锁都会等待。

方法名作用
func (rw *RWMutex) Lock()获取写锁
func (rw *RWMutex) Unlock()释放写锁
func (rw *RWMutex) RLock()获取读锁
func (rw *RWMutex) RUnlock()释放读锁
func (rw *RWMutex) RLocker() Locker返回一个实现Locker接口的读写锁
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
var (
	mutex   sync.Mutex
	rwMutex sync.RWMutex
)

// 使用互斥锁进行的写操作
func writeWithLock() {
	mutex.Lock()
	x = x + 1
	time.Sleep(10 * time.Millisecond)
	mutex.Unlock()
	wg.Done()
}

// 使用互斥锁的读操作
func readWithLock() {
	mutex.Lock()
	time.Sleep(time.Millisecond)
	mutex.Unlock()
	wg.Done()
}

// 使用读写互斥锁的写操作
func writeWithRwLock() {
	rwMutex.Lock()
	x = x + 1
	time.Sleep(10 * time.Millisecond)
	rwMutex.Unlock()
	wg.Done()
}

// 使用读写互斥锁的读操作
func readWithRwLock() {
	rwMutex.RLock()
	time.Sleep(time.Millisecond)
	rwMutex.RUnlock()
	wg.Done()
}

func do(wf, rf func(), wc, rc int) {
	start := time.Now()
	// wc个并发写操作
	for i := 0; i < wc; i++ {
		wg.Add(1)
		go wf()
	}

	// rc个并发读操作
	for i := 0; i < rc; i++ {
		wg.Add(1)
		go rf()
	}
	wg.Wait()
	cost := time.Since(start)
	fmt.Printf("x:%v const:%v\n", x, cost)
}

func TestRW(t *testing.T) {
	do(writeWithRwLock, readWithRwLock, 10, 100)
}

func TestWithoutRw(t *testing.T) {
	do(writeWithLock, readWithLock, 10, 100)
}

输出

1
2
3
4
5
6
7
=== RUN   TestRW
x:10 const:167.9521ms
--- PASS: TestRW (0.17s)
=== RUN   TestWithoutRw
x:20 const:1.7118294s
--- PASS: TestWithoutRw (1.71s)
PASS
分析
  • 从最终的执行结果可以看出,使用读写互斥锁在读多写少的场景下能够极大地提高程序的性能
  • 不过需要注意的是如果一个程序中的读操作和写操作数量级差别不大,那么读写互斥锁的优势就发挥不出来
  • 后面可以参考benchmark进行测试
  • 这个程序加延时只是为了看的更明显,可以把延时取消的,执行的更快!