go语言并发编程
并 发 编 程
优雅的并发编程范式,完善的并发支持,出色的并发性能是Go语言区别于其他语言的一大特色
并发基础
并发程序的几条优点,让大家认识到并发势在必行:
并发能更客观地表现问题模型;
并发可以充分利用CPU核心的优势,提高程序的执行效率;
并发能充分利用CPU与其他硬件设备固有的异步性。
现在我们已经意识到并发的好处了,那么到底有哪些方式可以实现并发执行呢?就目前而言,并发包含以下几种主流的实现模型。
** 多进程**。多进程是在操作系统层面进行并发的基本模式。同时也是开销最大的模式。在Linux平台上,很多工具链正是采用这种模式在工作。比如某个Web服务器,它会有专门的进程负责网络端口的监听和链接管理,还会有专门的进程负责事务和运算。这种方法的好处在于简单、进程间互不影响,坏处在于系统开销大,因为所有的进程都是由内核管理的。
多线程。多线程在大部分操作系统上都属于系统层面的并发模式,也是我们使用最多的最有效的一种模式。目前,我们所见的几乎所有工具链都会使用这种模式。它比多进程的开销小很多,但是其开销依旧比较大,且在高并发模式下,效率会有响。
基于回调的非阻塞/异步IO。这种架构的诞生实际上来源于多线程模式的危机。在很多高并发服务器开发实践中,使用多线程模式会很快耗尽服务器的内存和CPU资源。而这种模式通过事件驱动的方式使用异步IO,使服务器持续运转,且尽可能地少用线程,降低开销,它目前在Node.js中得到了很好的实践。但是使用这种模式,编程比多线程要复杂,因为它把流程做了分割,对于问题本身的反应不够自然。
协程。协程(Coroutine)本质上是一种用户态线程,不需要操作系统来进行抢占式调度,且在真正的实现中寄存于线程中,因此,系统开销极小,可以有效提高线程的任务并发性,而避免多线程的缺点。使用协程的优点是编程简单,结构清晰;缺点是需要语言的支持,如果不支持,则需要用户在程序中自行实现调度器。目前,原生支持协程的语言还很少。
人的思维模式可以认为是串行的,而且串行的事务具有确定性。线程类并发模式在原先的确定性中引入了不确定性,这种不确定性给程序的行为带来了意外和危害,也让程序变得不可控。
线程之间通信只能采用共享内存的方式。为了保证共享内存的有效性,我们采取了很多措施,比如加锁等,来避免死锁或资源竞争。
我们可以将之前的线程加共享内存的方式归纳为“共享内存系统”,虽然共享内存系统是一种有效的并发模式,但它也暴露了众多使用上的问题。计算机科学家们在近40年的研究中又产生了一种新的系统模型,称为“消息传递系统”。对线程间共享状态的各种操作都被封装在线程之间传递的消息中,这通常要求:发送消息时对状态进行复制,并且在消息传递的边界上交出这个状态的所有权。从逻辑上来看,这个操作与共享内存系统中执行的原子更新操作相同,但从物理上来看则非常不同。由于需要执行复制操作,所以大多数消息传递的实现在性能上并不优越,但线程中的状态管理工作通常会变得更为简单。
协程
执行体是个抽象的概念,在操作系统层面有多个概念与之对应,比如操作系统自己掌管的进程(process)、进程内的线程(thread)以及进程内的协程(coroutine,也叫轻量级线程)。与传统的系统级线程和进程相比,协程的最大优势在于其“轻量级”,可以轻松创建上百万个而不会导致系统资源衰竭,而线程和进程通常最多也不能超过1万个。这也是协程也叫轻量级线程的
原因。
多数语言在语法层面并不直接支持协程,而是通过库的方式支持,但用库的方式支持的功能也并不完整,比如仅仅提供轻量级线程的创建、销毁与切换等能力。如果在这样的轻量级线程中
调用一个同步 IO 操作,比如网络通信、本地文件读写,都会阻塞其他的并发执行轻量级线程,从而无法真正达到轻量级线程本身期望达到的目标。
Go 语言在语言级别支持轻量级线程,叫goroutine。Go 语言标准库提供的所有系统调用操作(当然也包括所有同步 IO 操作),都会出让 CPU 给其他goroutine。这让事情变得非常简单,让轻量级线程的切换管理不依赖于系统的线程和进程,也不依赖于CPU的核心数量。
goroutine
goroutine是Go语言中的轻量级线程实现,由Go运行时(runtime)管理。
假设我们需要实现一个函数Add(),它把两个参数相加,并将结果打印到屏幕上,具体代码如下:
func Add(x, y int) {
z := x + y
fmt.Println(z)
}
那么,如何让这个函数并发执行呢?具体代码如下:
go Add(1, 1)
在一个函数调用前加上go关键字,这次调用就会在一个新的goroutine中并发执行。当被调用的函数返回时,这个goroutine也自动结束了。需要注意的是,如果这个函数有返回值,那么这个
返回值会被丢弃。
package main
import "fmt"
func Add(x, y int) {
z := x + y
fmt.Println(z)
}
func main() {
for i := 0; i < 10; i++ {
go Add(i, i)
}
}
在上面的代码里,我们在一个for循环中调用了10次Add()函数,它们是并发执行的。可是当你编译执行了上面的代码,就会发现一些奇怪的现象:
“什么?!屏幕上什么都没有,程序没有正常工作!”
是什么原因呢?明明调用了10次Add(),应该有10次屏幕输出才对。要解释这个现象,就涉及Go语言的程序执行机制了。
Go程序从初始化main package并执行main()函数开始,当main()函数返回时,程序退出,且程序并不等待其他goroutine(非主goroutine)结束。
对于上面的例子,主函数启动了10个goroutine,然后返回,这时程序就退出了,而被启动的执行Add(i, i)的goroutine没有来得及执行,所以程序没有任何输出。
并发通信
关键字go的引入使得在Go语言中并发编程变得简单而优雅,但我们同时也应该意识到并发编程的原生复杂性,并时刻对并发中容易出现的问题保持警惕。
并发单元间的通信是最大的问题。
在工程上,有两种最常见的并发通信模型:共享数据和消息。
共享数据是指多个并发单元分别保持对同一个数据的引用,实现对该数据的共享。被共享的数据可能有多种形式,比如内存数据块、磁盘文件、网络数据等。在实际工程应用中最常见的无
疑是内存了,也就是常说的共享内存。
先看看我们在C语言中通常是怎么处理线程间数据共享的
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
void *count();
pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;
int counter = 0;
main()
{
int rc1, rc2;
pthread_t thread1, thread2;
/* 创建线程,每个线程独立执行函数functionC */
if((rc1 = pthread_create(&thread1, NULL, &add, NULL)))
{
printf("Thread creation failed: %d\n", rc1);
}
if((rc2 = pthread_create(&thread2, NULL, &add, NULL)))
{
printf("Thread creation failed: %d\n", rc2);
}
/* 等待所有线程执行完毕 */
pthread_join( thread1, NULL);
pthread_join( thread2, NULL);
exit(0);
}
void *count()
{
pthread_mutex_lock( &mutex1 );
counter++;
printf("Counter value: %d\n",counter);
pthread_mutex_unlock( &mutex1 );
}
现在我们尝试将这段C语言代码直接翻译为Go语言代码
package main
import "fmt"
import "sync"
import "runtime"
var counter int = 0
func Count(lock *sync.Mutex) {
lock.Lock()
counter++
fmt.Println(z)
lock.Unlock()
}
func main() {
lock := &sync.Mutex{}
for i := 0; i < 10; i++ {
go Count(lock)
}
for {
lock.Lock()
c := counter
lock.Unlock()
runtime.Gosched()
if c >= 10 {
break
}
}
}
在上面的例子中,我们在10个goroutine中共享了变量counter。每个goroutine执行完成后,将counter的值加1。因为10个goroutine是并发执行的,所以我们还引入了锁,也就是代码中的lock变量。每次对n的操作,都要先将锁锁住,操作完成后,再将锁打开。在主函数中,使用for循环来不断检查counter的值(同样需要加锁)。当其值达到10时,说明所有goroutine都执行完毕了,这时主函数返回,程序退出。事情好像开始变得糟糕了。实现一个如此简单的功能,却写出如此臃肿而且难以理解的代码。
Go语言提供的是另一种通信模型,即以消息机制而非共享内存作为通信方式。
消息机制认为每个并发单元是自包含的、独立的个体,并且都有自己的变量,但在不同并发单元间这些变量不共享。每个并发单元的输入和输出只有一种,那就是消息。这有点类似于进程的概念,每个进程不会被其他进程打扰,它只做好自己的工作就可以了。不同进程间靠消息来通信,它们不会共享内存。
channel
channel是Go语言在语言级别提供的goroutine间的通信方式。我们可以使用channel在两个或多个goroutine之间传递消息。channel是进程内的通信方式,因此通过channel传递对象的过程和调用函数时的参数传递行为比较一致,比如也可以传递指针等。
channel是类型相关的。也就是说,一个channel只能传递一种类型的值,这个类型需要在声明channel时指定。如果对Unix管道有所了解的话,就不难理解channel,可以将其认为是一种类型安全的管道。
在了解channel的语法前,我们先看下用channel的方式重写上面的例子是什么样子的,以此
package main
import "fmt"
func Count(ch chan int) {
ch <- 1
fmt.Println("Counting")
}
func main() {
chs := make([]chan int, 10)
for i := 0; i < 10; i++ {
chs[i] = make(chan int)
go Count(chs[i])
}
for _, ch := range(chs) {
<-ch
}
}
在这个例子中,我们定义了一个包含10个channel的数组(名为chs),并把数组中的每个channel分配给10个不同的goroutine。在每个goroutine的Add()函数完成后,我们通过ch <- 1语句向对应的channel中写入一个数据。在这个channel被读取前,这个操作是阻塞的。在所有的goroutine启动完成后,我们通过<-ch语句从10个channel中依次读取数据。在对应的channel写入数据前,这个操作也是阻塞的。
基本语法
一般channel的声明形式为:var chanName chan ElementType 与一般的变量声明不同的地方仅仅是在类型之前加了chan关键字。ElementType指定这个channel所能传递的元素类型。举个例子,我们声明一个传递类型为int的channel:
var ch chan int
或者,我们声明一个map,元素是bool型的channel:
var m map[string] chan bool
定义一个channel也很简单,直接使用内置的函数make()即可:
ch := make(chan int)
这就声明并初始化了一个int型的名为ch的channel。
在channel的用法中,最常见的包括写入和读出。将一个数据写入(发送)至channel的语法很直观,如下:
ch <- value
向channel写入数据通常会导致程序阻塞,直到有其他goroutine从这个channel中读取数据。从channel中读取数据的语法是
value := <-ch
如果channel之前没有写入数据,那么从channel中读取数据也会导致程序阻塞,直到channel中被写入数据为止。我们之后还会提到如何控制channel只接受写或者只允许读取,即单向channel。
select
早在Unix时代,select机制就已经被引入。通过调用select()函数来监控一系列的文件句柄,一旦其中一个文件句柄发生了IO动作,该select()调用就会被返回。Go语言直接在语言级别支持select关键字,用于处理异步IO问题。
select的用法与switch语言非常类似,由select开始一个新的选择块,每个选择条件由case语句来描述。与switch语句可以选择任何可使用相等比较的条件相比,select有比较多的限制,其中最大的一条限制就是每个case语句里必须是一个IO操作,大致的结构如下:
select {
case <-chan1:
// 如果chan1成功读到数据,则进行该case处理语句
case chan2 <- 1:
// 如果成功向chan2写入数据,则进行该case处理语句
default:
// 如果上面都没有成功,则进入default处理流程
}
可以看出,select不像switch,后面并不带判断条件,而是直接去查看case语句。每个case语句都必须是一个面向channel的操作。比如上面的例子中,第一个case试图从chan1读取
一个数据并直接忽略读到的数据,而第二个case则是试图向chan2中写入一个整型数1,如果这两者都没有成功,则到达default语句。
缓冲机制
之前我们示范创建的都是不带缓冲的channel,这种做法对于传递单个数据的场景可以接受,但对于需要持续传输大量数据的场景就有些不合适了。接下来我们介绍如何给channel带上缓冲,
从而达到消息队列的效果。
要创建一个带缓冲的channel,其实也非常容易:
c := make(chan int, 1024)
在调用make()时将缓冲区大小作为第二个参数传入即可,比如上面这个例子就创建了一个大小为1024的int类型channel,即使没有读取方,写入方也可以一直往channel里写入,在缓冲区被
填完之前都不会阻塞。
从带缓冲的channel中读取数据可以使用与常规非缓冲channel完全一致的方法,但我们也可以使用range关键来实现更为简便的循环读取:
for i := range c {
fmt.Println("Received:", i)
}
超时机制
在并发编程的通信过程中,最需要处理的就是超时问题,即向channel写数据时发现channel已满,或者从channel试图读取数据时发现channel为空。如果不正确处理这些情况,很可能会导
致整个goroutine锁死。
使用channel时需要小心,比如对于以下这个用法:
i := <-ch
不出问题的话一切都正常运行。但如果出现了一个错误情况,即永远都没有人往ch里写数据,那么上述这个读取动作也将永远无法从ch中读取到数据,导致的结果就是整个goroutine永远阻塞并
没有挽回的机会。如果channel只是被同一个开发者使用,那样出问题的可能性还低一些。但如果一旦对外公开,就必须考虑到最差的情况并对程序进行保护。
Go语言没有提供直接的超时处理机制,但我们可以利用select机制。虽然select机制不是专为超时而设计的,却能很方便地解决超时问题。因为select的特点是只要其中一个case已经完成,程序就会继续往下执行,而不会考虑其他case的情况。
基于此特性,我们来为channel实现超时机制:
// 首先,我们实现并执行一个匿名的超时等待函数
timeout := make(chan bool, 1)
go func() {
time.Sleep(1e9) // 等待1秒钟
timeout <- true
}()
// 然后我们把timeout这个channel利用起来
select {
case <-ch:
// 从ch中读取到数据
case <-timeout:
// 一直没有从ch中读取到数据,但从timeout中读取到了数据
}
这样使用select机制可以避免永久等待的问题,因为程序会在timeout中获取到一个数据后继续执行,无论对ch的读取是否还处于等待状态,从而达成1秒超时的效果。这种写法看起来是一个小技巧,但却是在Go语言开发中避免channel通信超时的最有效方法。
channel的传递
传递的数据只是一个整型数,在实际的应用场景中这通常会是一个数据块。
首先限定基本的数据结构:
type PipeData struct {
value int
handler func(int) int
next chan int
}
然后我们写一个常规的处理函数。我们只要定义一系列PipeData的数据结构并一起传递给这个函数,就可以达到流式处理数据的目的:
func handle(queue chan *PipeData) {
for data := range queue {
data.next <- data.handler(data.value)
}
}
这里我们只给出了大概的样子,限于篇幅不再展开。
单向channel
顾名思义,单向channel只能用于发送或者接收数据。channel本身必然是同时支持读写的,否则根本没法用。假如一个channel真的只能读,那么肯定只会是空的,因为你没机会往里面写数据。同理,如果一个channel只允许写,即使写进去了,也没有丝毫意义,因为没有机会读取里面的数据。所谓的单向channel概念,其实只是对channel的一种使用限制。
我们在将一个channel变量传递到一个函数时,可以通过将其指定为单向channel变量,从而限制该函数中可以对此channel的操作,比如只能往这个channel写,或者只能从这个channel读。
单向channel变量的声明非常简单,如下:
`
var ch1 chan int // ch1是一个正常的channel,不是单向的
var ch2 chan<- float64// ch2是单向channel,只用于写float64数据var ch3 <-chan int // ch3是单向channel,只用于读取int数据
channel的意义:就是在单向channel和双向channel之间进行转换。示例如下:
ch4 := make(chan int)
ch5 := <-chan int(ch4) // ch5就是一个单向的读取channel
ch6 := chan<- int(ch4) // ch6 是一个单向的写入channel
基于ch4,我们通过类型转换初始化了两个单向channel:单向读的ch5和单向写的ch6。
关闭channel
关闭channel非常简单,直接使用Go语言内置的close()函数即可:
close(ch) 在介绍了如何关闭channel之后,我们就多了一个问题:如何判断一个channel是否已经被关闭?我们可以在读取的时候使用多重返回值的方式:
x, ok := <-ch
多核并行化
在执行一些昂贵的计算任务时,我们希望能够尽量利用现代服务器普遍具备的多核特性来尽量将任务并行化,从而达到降低总计算时间的目的。此时我们需要了解CPU核心的数量,并针对性地分解计算任务到多个goroutine中去并行运行。
下面我们来模拟一个完全可以并行的计算任务:计算N个整型数的总和。我们可以将所有整型数分成M份,M即CPU的个数。让每个CPU开始计算分给它的那份计算任务,最后将每个CPU的计算结果再做一次累加,这样就可以得到所有N个整型数的总和:
type Vector []float64
// 分配给每个CPU的计算任务
func (v Vector) DoSome(i, n int, u Vector, c chan int) {
for ; i < n; i++ {
v[i] += u.Op(v[i])
}
c <- 1 // 发信号告诉任务管理者我已经计算完成了
}
const NCPU = 16 // 假设总共有16核
func (v Vector) DoAll(u Vector) {
c := make(chan int, NCPU) // 用于接收每个CPU的任务完成信号
for i := 0; i < NCPU; i++ {
go v.DoSome(i*len(v)/NCPU, (i+1)*len(v)/NCPU, u, c)
}
// 等待所有CPU的任务完成
for i := 0; i < NCPU; i++ {
<-c // 获取到一个数据,表示一个CPU计算完成了
}
// 到这里表示所有计算已经结束
}
这两个函数看起来设计非常合理。DoAll()会根据CPU核心的数目对任务进行分割,然后开辟多个goroutine来并行执行这些计算任务。
同步
Go语言的设计者虽然对channel有极高的期望,但也提供了妥善的资源锁方案。
同步锁
Go语言包中的sync包提供了两种锁类型:sync.Mutex和sync.RWMutex。Mutex是最简单的一种锁类型,同时也比较暴力,当一个goroutine获得了Mutex后,其他goroutine就只能乖乖等到这个goroutine释放该Mutex。RWMutex相对友好些,是经典的单写多读模型。在读锁占用的情况下,会阻止写,但不阻止读,也就是多个goroutine可同时获取读锁(调用RLock()方法;而写锁(调用Lock()方法)会阻止任何其他goroutine(无论读和写)进来,整个锁相当于由该goroutine独占。从RWMutex的实现看,RWMutex类型其实组合了Mutex:
type RWMutex struct {
w Mutex
writerSem uint32
readerSem uint32
readerCount int32
readerWait int32
}
对于这两种锁类型,任何一个Lock()或RLock()均需要保证对应有Unlock()或RUnlock()调用与之对应,否则可能导致等待该锁的所有goroutine处于饥饿状态,甚至可能导致死锁。锁的典型使用模式如下:
var l sync.Mutex
func foo() {
l.Lock()
defer l.Unlock()
//...
}
全局唯一性操作
对于从全局的角度只需要运行一次的代码,比如全局初始化操作,Go语言提供了一个Once类型来保证全局的唯一性操作,具体代码如下:
var a string
var once sync.Once
func setup() {
a = "hello, world"
}
func doprint() {
once.Do(setup)
print(a)
}
func twoprint() {
go doprint()
go doprint()
}
- 点赞
- 收藏
- 关注作者
评论(0)