在go语言中处理竞争
简介
在Go语言中,每一个并发的执行单元叫作一个goroutine。设想这里的一个程序有两个函数,
一个函数做计算,另一个输出结果,假设两个函数没有相互之间的调用关系。一个线性的程序会先调用其中的一个函数,然后再调用另一个。如果程序中包含多个goroutine,对两个函
数的调用则可能发生在同一时刻。
如果你使用过操作系统或者其它语言提供的线程,那么你可以简单地把goroutine类比作一个线程,尽管两者的区别实际上只是一个量的区别,但量变会引起质变的道理同样适用于goroutine和线程。
其中的明显而微妙区别在于,OS中的线程通常分配2MB的内存块作为栈,而一个goroutine使用动态栈的概念,开始后初始大小只有2KB,这比OS的线程小得多。
并且与操作系统的栈一样,将保存程序中活跃或挂起的函数调用本地变量。不同的是goroutine是动态,初始化2KB,最大能达到1GB,比固定大小的线程大得多。不过一般情况下是不会达到最大值的。
在go程序中同时创建成百上千个goroutine是非常普遍的,最多能达到百万级,比如在sync包中,经常使用WaitGroup,这一个只是用来等待并关闭channel的goroutine,几乎不占内存空间。
2 sync 同步包互斥锁
sync同步包 在 src/sync/ 路径,在其中有这样的提示:
不应该复制哪些包含了此包中类型的值。
禁止复制首次使用后的Mutex
禁止复制使用后的RWMutex
禁止复制使用后的Cond
互斥在某些独占的数据操作中很实用,而且被sync包里的Mutex类型直接支持。
它的Lock方法能够获取到token(这里叫锁),并且Unlock方法会释放这个token。
例子1: 复制互斥锁
type mutn struct {
n int
sync.Mutex
}
func DoMutn() {
f := mutn{n: 19}
go func(f mutn) {
for {
log.Println("goto: try to lock mutn")
f.Lock()
log.Println("goto: lock mutn ok.")
time.Sleep(3 * time.Second)
f.Unlock()
log.Println("goto:unlock ok")
}
}(f)
f.Lock()
log.Println("goto:lock mutn ok.")
Mutex 互斥锁首次使用后复制其值
go func(f mutn) {
for {
log.Println("gr3: try to lock...")
f.Lock()
log.Println("gr3: lock mutn ok.")
time.Sleep(5 * time.Second)
f.Unlock()
log.Println("gr3:unlock ok")
}
}(f)
time.Sleep(20 * time.Second)
f.Unlock()
log.Println("g1:unlock ok")
}
func main() {
DoMutn()
}
我们在示例中创建两个goroutine,g2和g3,示例运行的结果显示: gr3阻塞在加锁操作。
而g2按预期运行,g2和g3的区别在于g2是互斥锁首次使用之前创建的。
而g3则是在互斥锁执行完加锁操作并处于锁定状态时创建的。
并且程序在创建g3时复制了 mutn实例。 该实例包括了sync.Mutex实例。
在标准库互斥锁Mutex的定义非常简单,它有两个字段 state,sema组成
type Mutex struct {
state int32
sema uint32
}
这两个字段表示
state 表示当前互斥锁状态。
sema 用于控制锁状态信号量。
对mutex实例的复制即是对两个整型字段的复制。 在初始状态,Mutex实例处于 Unlocked状态,state和sema都为0.
g2复制了处于初始化的Mutex实例,副本的state和sema为0, 这与g2自定义一个新的Mutex实例无异,这决定了g2后续可以按预期运行。
后续主程序调用Lock方法,Mutex 实例变成 Locked状态,state字段值为 sync.mutex-Locked
此后g3创建时恰恰复制了处于 Locked状态的实例。
实例副本state字段值也为 sync.mutexLocked ,因此g3在对其实例副本调用Lock将导致进入阻塞
— 也就是死锁 因为没有任何其他计划调用该副本的Unlock方法,Go不支持递归锁—
那些sync包中类型的实例在首次使用后被复制得到的副本,一旦再被使用将导致不可预期结果,为此在使用sync包的类型时,
推荐通过闭包方式或传递类型实例(或包裹该类型的类型实例)的地址或指针进行,这是sync包最需要注意的。
互斥锁 sync.Mutex 临界区同步原语首选,常被用来对结构体对象内部状态,缓存进行保护。 使用最为广泛。
它通常被用以保护结构体内部状态,缓存,是广泛的临界区同步原语。
3 sync同步包读写锁
读写锁 RWMutex 有大并发需求的创建,使用读写锁。 RWMutex
读写锁适合具有一定并发量,并且读取操作明显大于写操作的场景。
例子2:
package main
import (
"sync"
"testing"
)
var (
临界区需要保护的数据
dataOne = 0
dataTwo = 1
mutexOne sync.Mutex
mutexTwo sync.RWMutex
)
互斥锁性能 读取
func BenchmarkReadSyncByMutex(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
mutexOne.Lock()
_ = dataOne
mutexOne.Unlock()
}
})
}
互斥锁性能 写入
func BenchmarkWriteSyncByMutex(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
mutexOne.Lock()
dataOne += 1
mutexOne.Unlock()
}
})
}
读取性能评估
func BenchmarkReadSyncByRWMutex(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
mutexTwo.Lock()
_ = dataTwo
mutexTwo.Unlock()
}
})
}
写性能评估
func BenchmarkWriteSyncByRWMutex(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
mutexTwo.Lock()
dataTwo += 1
mutexTwo.Unlock()
}
})
}
执行:
go test -v -count 2 -bench . mutex_rw_bench_test.go -cpu 2,4,8,32,128 >bm.txt
goarch: amd64
cpu: AMD Ryzen 5 3500U with Radeon Vega Mobile Gfx
BenchmarkReadSyncByMutex
BenchmarkReadSyncByMutex-2 30831019 40.23 ns/op
BenchmarkReadSyncByMutex-2 32428663 42.62 ns/op
BenchmarkReadSyncByMutex-4 10713606 114.1 ns/op
BenchmarkReadSyncByMutex-4 10344114 98.16 ns/op
BenchmarkReadSyncByMutex-8 10293854 116.5 ns/op
BenchmarkReadSyncByMutex-8 10168749 116.9 ns/op
BenchmarkReadSyncByMutex-32 11110328 111.3 ns/op
BenchmarkReadSyncByMutex-32 10753728 108.3 ns/op
BenchmarkReadSyncByMutex-128 12562038 98.00 ns/op
BenchmarkReadSyncByMutex-128 12499010 96.89 ns/op
BenchmarkWriteSyncByMutex
BenchmarkWriteSyncByMutex-2 17350693 67.81 ns/op
BenchmarkWriteSyncByMutex-2 15188412 66.77 ns/op
BenchmarkWriteSyncByMutex-4 9374296 125.0 ns/op
BenchmarkWriteSyncByMutex-4 10168714 126.8 ns/op
BenchmarkWriteSyncByMutex-8 9916609 119.1 ns/op
BenchmarkWriteSyncByMutex-8 9755517 121.1 ns/op
BenchmarkWriteSyncByMutex-32 10713538 113.9 ns/op
BenchmarkWriteSyncByMutex-32 10568701 113.5 ns/op
BenchmarkWriteSyncByMutex-128 11649591 102.3 ns/op
BenchmarkWriteSyncByMutex-128 11973096 102.5 ns/op
BenchmarkReadSyncByRWMutex
BenchmarkReadSyncByRWMutex-2 13524128 102.7 ns/op
BenchmarkReadSyncByRWMutex-2 11999124 101.4 ns/op
BenchmarkReadSyncByRWMutex-4 8391038 145.8 ns/op
BenchmarkReadSyncByRWMutex-4 14412699 126.1 ns/op
BenchmarkReadSyncByRWMutex-8 10525567 116.3 ns/op
BenchmarkReadSyncByRWMutex-8 10255752 116.4 ns/op
BenchmarkReadSyncByRWMutex-32 10255778 117.3 ns/op
BenchmarkReadSyncByRWMutex-32 10208638 117.9 ns/op
BenchmarkReadSyncByRWMutex-128 10810089 111.0 ns/op
BenchmarkReadSyncByRWMutex-128 11110348 108.1 ns/op
BenchmarkWriteSyncByRWMutex
BenchmarkWriteSyncByRWMutex-2 12499010 91.11 ns/op
BenchmarkWriteSyncByRWMutex-2 11999124 99.52 ns/op
BenchmarkWriteSyncByRWMutex-4 7842598 147.7 ns/op
BenchmarkWriteSyncByRWMutex-4 7946450 151.0 ns/op
BenchmarkWriteSyncByRWMutex-8 10210080 118.1 ns/op
BenchmarkWriteSyncByRWMutex-8 10168724 115.7 ns/op
BenchmarkWriteSyncByRWMutex-32 9835380 119.9 ns/op
BenchmarkWriteSyncByRWMutex-32 10339772 117.5 ns/op
BenchmarkWriteSyncByRWMutex-128 10908296 109.5 ns/op
BenchmarkWriteSyncByRWMutex-128 10810030 109.9 ns/op
PASS
简单分析:
1 在小并发量时,互斥锁性能更好,并发量增大,互斥锁竞争激烈,导致加锁和解锁性能下降,
但是最后也恒定在最好记录的2倍左右。
2 读写锁的读锁性能并未随着并发量增大而性能下降,始终在恒定值.
3 并发量较大时,读写锁的写锁性能比互斥锁,读写锁的读锁都差,并且随着并发量增大,写锁性能有继续下降趋势。
多个例程goroutine可以同时持有读锁,从而减少在锁竞争等待的时间。
而互斥锁即便为读请求,同一时刻也只能有一个例程持有锁,其他goroutine被阻塞在加锁操作等待被调度。
由于处于for循环测试中,需要注意的是,不能在 unlock时使用 defer,
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
mutexTwo.Lock()
dataTwo += 1
defer mutexTwo.Unlock()
}
})
如此在并发执行时,函数不会退出,defer得不到执行,将导致全部死锁。
4 条件变量 sync.Cond
同步原语之一,用于避免轮询。
本质是一个容器,存放了一个或一组等待某条件的一些 协程 goroutine
当条件成立时,这些协程 goroutine将被唤醒去执行。
例子3: 条件变量的实现
import (
"fmt"
"sync"
"time"
)
使用sync.Cond 实例初始化
type signal struct{}
var ready bool
func worker(i int) {
fmt.Printf("worker %d: is working...\n", i)
time.Sleep(1 * time.Second)
fmt.Printf("worker %d:worker done\n", i)
}
func spawnGroup(f func(i int), num int, mu *sync.Mutex) <-chan signal {
c := make(chan signal)
var wg sync.WaitGroup
for i := 0; i < num; i++ {
wg.Add(1)
go func(i int) {
for {
mu.Lock()
if !ready {
mu.Unlock()
time.Sleep(100 * time.Microsecond)
continue
}
mu.Unlock()
fmt.Printf("worker %d: start to work...\n", i)
f(i)
wg.Done()
return
}
}(i + 1)
}
go func() {
wg.Wait()
c <- signal(struct{}{})
}()
return c
}
模拟ready准备工作
func DoWorkers() {
fmt.Println("start a group of workers...")
mu := &sync.Mutex{}
c := spawnGroup(worker, 5, mu)
time.Sleep(5 * time.Second)
fmt.Println("the group of workers start to work...")
mu.Lock()
ready = true
mu.Unlock()
<-c
fmt.Println("the group of workers work done!")
}
func main() {
DoWorkers()
}
执行例子3:
go run .\cond_case.go
start a group of workers...
the group of workers start to work...
worker 5: start to work...
worker 5: is working...
worker 1: start to work...
worker 1: is working...
worker 2: start to work...
worker 2: is working...
worker 4: start to work...
worker 4: is working...
worker 3: start to work...
worker 3: is working...
worker 3:worker done
worker 5:worker done
worker 2:worker done
worker 1:worker done
worker 4:worker done
the group of workers work done!
我们看到sync.Cond 实例的初始化需要一个满足实现了sync.Locker 接口的类型实例,
通常我们使用sync.Mutex.
条件变量需要这个互斥锁同步临界区,保护用作条件的数据。
各个等待条件成立的例程 goroutine 在加锁后判断条件是否成立,如果不成立,则调用 sync.Cond 的Wait方法进入等待。
Wait方法在goroutine挂起前进行Unlock操作。
在执行函数,例程将ready置为true并调用sync.Cond 的Broadcast方法,各个阻塞的例程将被唤醒并从Wait方法返回。
在Wait方法返回前,Wait方法再次加锁让goroutine进入临界区。 接下来例程将再次对条件数据进行判定。
如果条件成立,则解锁进入下一个阶段,如果不成立,那么再次调用Wait方法挂起等待。
5 单例支持 sync.Once
在go原生只被执行一次且goroutine安全的函数只有每个的包的init函数。
sync.Once 提供了另一种更为灵活的机制,可以保证任意一个函数在程序运行期间只运行一次。
这经常被用于初始化和资源清理过程,以避免重复执行初始化或资源清理操作。
例子4: 单例的实现
package main
import (
"log"
"os"
"sync"
"time"
)
type Foo struct{}
var (
once sync.Once
instance *Foo
logger = log.New(os.Stdout, "info-", 18)
)
func GetInstance(id int) *Foo {
defer func() {
if e := recover(); e != nil {
logger.Printf("goroutine-%d:caught a panic:%s", id, e)
}
}()
logger.Printf("goroutine-%d:enter GetInstance\n", id)
once.Do(func() {
instance = &Foo{}
time.Sleep(3 * time.Second)
logger.Printf("goroutine-%d:the addr of instance is %p \n", id, instance)
panic("once.Do func panic")
})
return instance
}
func DoMains() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
inst := GetInstance(i)
logger.Printf("goroutine-%d, the addr of instance returned is %p\n", i, inst)
wg.Done()
}(i + 1)
}
time.Sleep(5 * time.Second)
inst := GetInstance(0)
logger.Printf("goroutine-0:the addr of instance is %p\n", inst)
wg.Wait()
logger.Printf("all goroutines exit.\n")
}
func main() {
DoMains()
}
执行结果:
info-10:48:39 once_pkg.go:25: goroutine-2:enter GetInstance
info-10:48:39 once_pkg.go:25: goroutine-5:enter GetInstance
info-10:48:39 once_pkg.go:25: goroutine-4:enter GetInstance
info-10:48:39 once_pkg.go:25: goroutine-3:enter GetInstance
info-10:48:39 once_pkg.go:25: goroutine-1:enter GetInstance
info-10:48:42 once_pkg.go:29: goroutine-2:the addr of instance is 0x1033f38
info-10:48:42 once_pkg.go:21: goroutine-2:caught a panic:once.Do func panic
info-10:48:42 once_pkg.go:41: goroutine-2, the addr of instance returned is 0x0
info-10:48:42 once_pkg.go:41: goroutine-5, the addr of instance returned is 0x1033f38
info-10:48:42 once_pkg.go:41: goroutine-4, the addr of instance returned is 0x1033f38
info-10:48:42 once_pkg.go:41: goroutine-3, the addr of instance returned is 0x1033f38
info-10:48:42 once_pkg.go:41: goroutine-1, the addr of instance returned is 0x1033f38
info-10:48:44 once_pkg.go:25: goroutine-0:enter GetInstance
info-10:48:44 once_pkg.go:47: goroutine-0:the addr of instance is 0x1033f38
info-10:48:44 once_pkg.go:49: all goroutines exit.
once.Do 将等待f执行完毕后返回,这期间其他执行once.Do函数的例程 将会阻塞等待。
如上 goroutine 2~5 的 Do函数返回后,后续的goroutine在执行Do函数将不再执行f并立即返回。
如上面的 gouroutine-0 即便在函数f中出现 panic,sync.Once 原语也认为once.Do 执行完毕,后续对once.Do调用将不再执行f。
5 sync.Pool 降低垃圾回收成本
官方介绍:
一个同步池是一组临时对象,可能是单独保存和检索的。
在池中存储的任何项目,都可以随时被删除,并且不会有删除通知。
如果池只有唯一的引用,那么该项目可能被剥离。
池可以被多个例程同时使用,并且是安全的。
池的目标是那些已经分配缓存但暂时没有使用的项目,以供再次使用。这将减轻GC压力。
这个特点使得它容易被用于建立高效,线程安全的列表。
但是它并不适合全部场景的自由大小的列表。 free lists
适当使用池管理一组临时项目,并务必在并发独立的包的独立客户端使用。
池提供了一种方式,支持跨多个客户端的摊销分配
使用池子的一个良好例子是 fmt 包,它用以维护一个动态大小的临时输出缓冲区。
当有许多goroutine例程在打印时,池子将在有负载时放大,在静止下缩小。
另一方面,一个不限制大小的列表作为 短周期对象不适合被用作池子。
该场景的开销不会被分摊。
短周期对象使用自己的自由列表将更高效。
池在使用后,不能被复制。
在Go内存模型术语中, Put(x) 被调用 synchronizes before
一个调用为获取返回同样的值 x。
类似地,一个调用到新的返回值 x synchronizes before
另一个调用为获取相同的值x。
type Pool struct {
noCopy noCopy
local unsafe.Pointer
localSize uintptr
victim unsafe.Pointer
victimSize uintptr
New func() any
}
属性 local 分配固定尺寸的 池子,实际类型为 [P]poolLocal
属性 localSize 本地数组大小
属性 victim 从上一个循环来的local
属性 victimSize victims数组的大小
func() 可选择指定何时将返回 nil 时,生成值功能,它可能不会随时更改。
性能评估代码:
var (
p sync.Pool
)
基础功能 依次 放入,获取 一个
func BenchmarkPool(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
p.Put(1)
p.Get()
}
})
}
连续放入100个,再连续拿取100个
func BenchmarkPoolOverflow(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for b := 0; b < 100; b++ {
p.Put(1)
}
for b := 0; b < 100; b++ {
p.Get()
}
}
})
}
模拟饥饿对象,以强制从其他池子拿取对象
func BenchmarkPoolStarvation(b *testing.B) {
var p sync.Pool
count := 100
将放入的对象数量减少33%
这将创建pool对象饥饿
并且强制 P-local 存储器 从其他池子Ps拿取对象。
countStarved := count - int(float32(count)*0.33)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for b := 0; b < countStarved; b++ {
p.Put(1)
}
for b := 0; b < count; b++ {
p.Get()
}
}
})
}
性能执行:
go test -v -count 2 -bench . .\pool_sync_test.go -cpu 2,4,8,32,128 >bmpool.txt
goarch: amd64
cpu: AMD Ryzen 5 3500U with Radeon Vega Mobile Gfx
BenchmarkPool
BenchmarkPool-2 92275749 12.69 ns/op
BenchmarkPool-2 100000000 12.57 ns/op
BenchmarkPool-4 205023169 5.613 ns/op
BenchmarkPool-4 228077668 6.074 ns/op
BenchmarkPool-8 321266732 3.493 ns/op
BenchmarkPool-8 345006420 3.483 ns/op
BenchmarkPool-32 349300358 3.690 ns/op
BenchmarkPool-32 310861339 3.449 ns/op
BenchmarkPool-128 339489630 3.491 ns/op
BenchmarkPool-128 360954796 3.460 ns/op
BenchmarkPoolOverflow
BenchmarkPoolOverflow-2 659786 1543 ns/op
BenchmarkPoolOverflow-2 749961 1728 ns/op
BenchmarkPoolOverflow-4 1295523 919.3 ns/op
BenchmarkPoolOverflow-4 1296963 933.8 ns/op
BenchmarkPoolOverflow-8 2085678 575.9 ns/op
BenchmarkPoolOverflow-8 2098604 583.7 ns/op
BenchmarkPoolOverflow-32 2099030 570.7 ns/op
BenchmarkPoolOverflow-32 2083220 571.3 ns/op
BenchmarkPoolOverflow-128 1942476 605.2 ns/op
BenchmarkPoolOverflow-128 2034183 604.0 ns/op
PASS
ok command-line-arguments 33.053s
6 小结
本文介绍sync包几个关键功能。
Mytex互斥锁,复制已锁定的Mutex会导致死锁。推荐通过指针使用这些类型。Mutex适用于保护临界区。
RWMutex适合读多写少的情况。
sync.Cond用于在条件满足时自动唤醒然后执行。
sync.Once确保函数只执行一次,常用于初始化。
sync.Pool用于缓存对象,减少GC压力,但使用需谨慎。
- 点赞
- 收藏
- 关注作者
评论(0)