Golang 并发模式:For-Select-Done、errgroup 和 worker pool
【摘要】 Go 中非常方便的三种并发模式for-select-doneerrgroupworker pool模式 1:For-Select-Donefor-select-done 模式的主要思想是使用无限 for 循环来处理来自各种通道的事件。 select该 select 语句允许从多个通道中选择准备执行的第一个操作。它可以是执行一些有用任务或退出无限循环的信号。在这种模式中,无限 for 循环通常...
Go 中非常方便的三种并发模式
- for-select-done
- errgroup
- worker pool
模式 1:For-Select-Done
for-select-done 模式的主要思想是使用无限 for 循环来处理来自各种通道的事件。 select
该 select 语句允许从多个通道中选择准备执行的第一个操作。它可以是执行一些有用任务或退出无限循环的信号。
在这种模式中,无限 for 循环通常在单独的 goroutine 中调用,以避免阻塞主线程。
代码示例
我们需要每秒调用一次函数 someTask() 。在某个点上,我们还应该能够根据上下文退出无限循环并终止 goroutine。
看看代码示例:
package main
import (
"context"
"fmt"
"math/rand"
"os"
"os/signal"
"syscall"
"time"
)
func someTask() {
fmt.Println(rand.Int() * rand.Int())
}
func PeriodicTask(ctx context.Context) {
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
someTask()
case <-ctx.Done():
fmt.Println("stopping PeriodicTask")
ticker.Stop()
return
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
go PeriodicTask(ctx)
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGTERM)
<-sigCh
}
为了定期调用 someTask() ,我们将创建一个周期为 1 秒的新代码。每秒都会向 ticker.C 通道发送一条消息,该消息在相应的 case 语句中被读取并触发 someTask() 函数的执行。
如果取消了上下文,则会向通道 <-ctx.Done() 发送一条消息,并触发相应的 case,这将退出 for 循环和 goroutine。
在 main 函数中,我们创建一个超时为 5 秒的上下文 ctx 。这意味着,如果与此上下文关联的操作未在指定时间内完成,则该上下文将被取消,并且与之关联的所有操作都将中断。
在 PeriodicTask goroutine 的无限循环中,ticker 会多次触发, someTask() 函数会多次执行。5 秒后,将触发上下文代码, select 语句中将触发 case <-ctx.Done() ,无限循环将终止。
代码执行结果:
-777992493516638130
-3179832721048378793
-4070697154687973288
2884823370254822744
stopping PeriodicTask
何时使用此模式
当需要基于某个事件或计时器在无限循环中执行任务,然后根据特定条件停止其执行时,此模式非常有用。
例如,它可用于使用保存到数据库的数据运行延迟计算,或者使用来自其他服务的数据异步扩充数据库中的记录。同时,当上下文被取消或发生其他外部事件时,我们始终能够安全地终止 goroutine。
模式 2:错误组
errgroup 模式的主要思想是启动一组 goroutines,等待它们完成工作,并处理执行过程中可能发生的任何错误。
代码示例
下面是使用 golang.org/x/sync/errgroup 包的示例代码,该包实现了 errgroup 模式。
package main
import (
"errors"
"fmt"
"golang.org/x/sync/errgroup"
)
// errFailure some custom error.
var errFailure = errors.New("some error")
func main() {
// Create errgroup.
group := errgroup.Group{}
// Run first task.
group.Go(func() error {
time.Sleep(5 * time.Second)
fmt.Println("doing some work 1")
return nil
})
// Run second task.
group.Go(func() error {
fmt.Println("doing some work 2")
return nil
})
// Run third task.
group.Go(func() error {
fmt.Println("doing some work 3")
return errFailure
})
// Wait for all goroutines to complete.
if err := group.Wait(); err != nil {
fmt.Printf("errgroup tasks ended up with an error: %v\n", err)
} else {
fmt.Println("all works done successfully")
}
}
在此示例中 errgroup.Group{} ,我们调用任务组,该任务组使用该 group.Go() 方法并行执行。
我们习惯于 group.Wait() 等待组中的所有任务完成。如果任何任务以错误结束,该 group.Wait() 方法将返回它收到的第一个错误。如果所有任务都成功完成,该 group.Wait() 方法将返回 nil 。
在此示例中,第三个任务以错误完成,因此 group.Wait() 返回一个错误,我们处理该错误。
代码执行结果:
doing some work 3
doing some work 2
doing some work 1
errgroup tasks ended up with an error: some error
让我们考虑此模式的另一个用例。
该 errgroup.WithContext() 函数创建一组新的 goroutines errgroup.Group 类型和一个新的 context.Context ,它可以在 goroutines 之间传递,并在必要时允许取消任务组的执行。
package main
import (
"context"
"errors"
"fmt"
"time"
"golang.org/x/sync/errgroup"
)
// errFailure some custom error.
var errFailure = errors.New("some error")
func main() {
// Create errgroup with context.
group, qctx := errgroup.WithContext(context.Background())
// Run first periodic task.
group.Go(func() error {
firstTask(qctx)
return nil
})
// Run second task.
group.Go(func() error {
if err := secondTask(); err != nil {
return err
}
return nil
})
// Wait for all tasks to complete or the error to appear.
if err := group.Wait(); err != nil {
fmt.Printf("errgroup tasks ended up with an error: %v", err)
}
}
func firstTask(ctx context.Context) {
var counter int
for {
select {
case <-ctx.Done():
return
case <-time.After(500 * time.Millisecond):
fmt.Println("some task")
if counter > 10 {
return
}
counter++
}
}
}
func secondTask() error {
time.Sleep(3 * time.Second)
return errFailure
}
在这里,该 firstTask() 函数表示需要执行某些操作 10 次的周期性任务。该 secondTask() 任务是执行某些任务但返回错误的函数。
在这种情况下,我们的定期任务在 secondTask() 任务 firstTask() 完成之前设法完成几次,并出现错误。此错误会导致 goroutine 组的 qctx 上下文被取消,并且它们将终止其执行。
代码执行结果:
some task
some task
some task
some task
some task
errgroup tasks ended up with an error: some error
何时使用此模式
使用这种模式,其中组中的所有 goroutines 都必须成功完成而不会出错。
例如,如果我需要使用组合数据执行计算。如果特定表中的数据丢失(例如,尚未保存),我将无法执行计算。在本例中,我返回有关缺失数据的自定义错误,并且所有其他数据库查询的执行将中断,直到下一个任务组运行。
模式 3:工作线程池
工作池模式是一种允许任务并行化的模式,限制了同时执行的 goroutine 的数量。
在此模式中,我们创建固定数量的工作线程,用于等待队列中的任务。当任务出现时,该任务将添加到队列中。如果工作线程是空闲的,它会从队列中获取任务并执行它。任务执行的结果可以返回到主线程,在那里可以进行处理。
代码示例
让我们考虑工作线程池的变体之一。这里我们不会将任务执行的结果返回给主线程。
package main
import (
"fmt"
"sync"
"time"
)
// Data to be proccessed.
var taskCount = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
const (
// Number of concurrent workers.
numberOfWorkers = 3
)
func main() {
// Create buffered channel.
jobs := make(chan struct{}, numberOfWorkers)
wg := sync.WaitGroup{}
// Add workers.
for id := range taskCount {
wg.Add(1)
jobs <- struct{}{}
go func(id int) {
worker(id)
<-jobs
defer wg.Done()
}(id)
}
// Wait for all workers to complete.
wg.Wait()
}
func worker(id int) {
fmt.Println(id)
time.Sleep(2 * time.Second)
}
切片 taskCount 包含需要处理的数据(共 10 个元素)。在常量 numberOfWorkers 中,我们设置并发工作线程的数量——队列的大小。
接下来,我们创建一个缓冲区 struct{} 大小为 numberOfWorkers 的缓冲通道 jobs 。
为了等待所有任务完成,我们创建一个 WaitGroup .使用循环 for ,我们遍历 taskCount 切片中的所有数据,将 WaitGroup 计数器递增 1,并将任务添加到通道队列中 jobs 进行处理。
然后,通过 goroutine 调用我们的工作线程处理程序,该处理程序处理数据,然后使用 wg.Done() ,将 WaitGroup 计数器递减 1 从队列中删除任务。
使用 wg.Wait() ,我们等待所有 goroutines 完成,直到组计数器变为零。我们有 3 个 worker numberOfWorkers ,因此前三个 goroutine 将同时执行,在第四个任务中,作业 <-struct{}{} 行将被阻塞,直到其中一个 worker 完成其工作并从队列中 <-jobs 获取下一个项目。
何时使用此模式
当需要处理大量任务时,工作池模式很有用,但我们希望限制并发执行的 goroutine 的数量,这将对代码性能产生积极影响并避免系统过载。此模式还允许通过增加允许的并发工作线程数来轻松扩展系统。
工作线程池可用于在服务器端处理客户端请求或执行后台任务,例如报告生成或数据处理。
当需要在数据库中处理和保存大量数据行时,我经常使用此模式。此外,此模式对于处理来自分布式队列(如 Kafka)的事件非常方便。
结论
有许多并发模式,但在这里,我介绍了三种高级并发模式,人们可以在他们的工作中使用它们。如果有兴趣了解其他并发模式,阅读有关扇入、扇出、管道和桥接通道模式的信息。
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
作者其他文章
评论(0)