Golang 并发模式:For-Select-Done、errgroup 和 worker pool

举报
Rolle 发表于 2024/01/17 23:54:44 2024/01/17
697 0 0
【摘要】 Go 中非常方便的三种并发模式for-select-doneerrgroupworker pool模式 1:For-Select-Donefor-select-done 模式的主要思想是使用无限 for 循环来处理来自各种通道的事件。 select该 select 语句允许从多个通道中选择准备执行的第一个操作。它可以是执行一些有用任务或退出无限循环的信号。在这种模式中,无限 for 循环通常...

Go 中非常方便的三种并发模式


  • for-select-done
  • errgroup
  • worker pool

模式 1:For-Select-Done

for-select-done 模式的主要思想是使用无限 for 循环来处理来自各种通道的事件。 select


select 语句允许从多个通道中选择准备执行的第一个操作。它可以是执行一些有用任务或退出无限循环的信号。

在这种模式中,无限 for 循环通常在单独的 goroutine 中调用,以避免阻塞主线程。


代码示例

我们需要每秒调用一次函数 someTask() 。在某个点上,我们还应该能够根据上下文退出无限循环并终止 goroutine。


看看代码示例:

package main

import (
  "context"
  "fmt"
  "math/rand"
  "os"
  "os/signal"
  "syscall"
  "time"
)

func someTask() {
  fmt.Println(rand.Int() * rand.Int())
}
func PeriodicTask(ctx context.Context) {
  ticker := time.NewTicker(time.Second)
  for {
    select {
    case <-ticker.C:
      someTask()
    case <-ctx.Done():
      fmt.Println("stopping PeriodicTask")
      ticker.Stop()
      return
    }
  }
}

func main() {
  ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  defer cancel()

  go PeriodicTask(ctx)

  sigCh := make(chan os.Signal, 1)
  signal.Notify(sigCh, syscall.SIGTERM)

  <-sigCh
}

为了定期调用 someTask() ,我们将创建一个周期为 1 秒的新代码。每秒都会向 ticker.C 通道发送一条消息,该消息在相应的 case 语句中被读取并触发 someTask() 函数的执行。


如果取消了上下文,则会向通道 <-ctx.Done() 发送一条消息,并触发相应的 case,这将退出 for 循环和 goroutine。


在 main 函数中,我们创建一个超时为 5 秒的上下文 ctx 。这意味着,如果与此上下文关联的操作未在指定时间内完成,则该上下文将被取消,并且与之关联的所有操作都将中断。


PeriodicTask goroutine 的无限循环中,ticker 会多次触发, someTask() 函数会多次执行。5 秒后,将触发上下文代码, select 语句中将触发 case <-ctx.Done() ,无限循环将终止。


代码执行结果:

-777992493516638130
-3179832721048378793
-4070697154687973288
2884823370254822744
stopping PeriodicTask

何时使用此模式

当需要基于某个事件或计时器在无限循环中执行任务,然后根据特定条件停止其执行时,此模式非常有用。


例如,它可用于使用保存到数据库的数据运行延迟计算,或者使用来自其他服务的数据异步扩充数据库中的记录。同时,当上下文被取消或发生其他外部事件时,我们始终能够安全地终止 goroutine。

模式 2:错误组

errgroup 模式的主要思想是启动一组 goroutines,等待它们完成工作,并处理执行过程中可能发生的任何错误。


代码示例

下面是使用 golang.org/x/sync/errgroup 包的示例代码,该包实现了 errgroup 模式。

package main

import (
    "errors"
    "fmt"

    "golang.org/x/sync/errgroup"
)

// errFailure some custom error.
var errFailure = errors.New("some error")

func main() {
    // Create errgroup.
    group := errgroup.Group{}

    // Run first task.
    group.Go(func() error {
        time.Sleep(5 * time.Second)
        fmt.Println("doing some work 1")
        return nil
    })

    // Run second task.
    group.Go(func() error {
        fmt.Println("doing some work 2")
        return nil
    })

    // Run third task.
    group.Go(func() error {
        fmt.Println("doing some work 3")
        return errFailure
    })

    // Wait for all goroutines to complete.
    if err := group.Wait(); err != nil {
        fmt.Printf("errgroup tasks ended up with an error: %v\n", err)
    } else {
        fmt.Println("all works done successfully")
    }
}

在此示例中 errgroup.Group{} ,我们调用任务组,该任务组使用该 group.Go() 方法并行执行。


我们习惯于 group.Wait() 等待组中的所有任务完成。如果任何任务以错误结束,该 group.Wait() 方法将返回它收到的第一个错误。如果所有任务都成功完成,该 group.Wait() 方法将返回 nil


在此示例中,第三个任务以错误完成,因此 group.Wait() 返回一个错误,我们处理该错误。


代码执行结果:

doing some work 3
doing some work 2
doing some work 1
errgroup tasks ended up with an error: some error

让我们考虑此模式的另一个用例。

errgroup.WithContext() 函数创建一组新的 goroutines errgroup.Group 类型和一个新的 context.Context ,它可以在 goroutines 之间传递,并在必要时允许取消任务组的执行。

package main

import (
    "context"
    "errors"
    "fmt"
    "time"

    "golang.org/x/sync/errgroup"
)

// errFailure some custom error.
var errFailure = errors.New("some error")

func main() {
    // Create errgroup with context.
    group, qctx := errgroup.WithContext(context.Background())

    // Run first periodic task.
    group.Go(func() error {
        firstTask(qctx)
        return nil
    })

    // Run second task.
    group.Go(func() error {
        if err := secondTask(); err != nil {
            return err
        }
        return nil
    })

    // Wait for all tasks to complete or the error to appear.
    if err := group.Wait(); err != nil {
        fmt.Printf("errgroup tasks ended up with an error: %v", err)
    }
}

func firstTask(ctx context.Context) {
    var counter int
    for {
        select {
            case <-ctx.Done():
            return
            case <-time.After(500 * time.Millisecond):
            fmt.Println("some task")
            if counter > 10 {
                return
            }
            counter++
        }
    }
}

func secondTask() error {
    time.Sleep(3 * time.Second)
    return errFailure
}

在这里,该 firstTask() 函数表示需要执行某些操作 10 次的周期性任务。该 secondTask() 任务是执行某些任务但返回错误的函数。

在这种情况下,我们的定期任务在 secondTask() 任务 firstTask() 完成之前设法完成几次,并出现错误。此错误会导致 goroutine 组的 qctx 上下文被取消,并且它们将终止其执行。


代码执行结果:

some task
some task
some task
some task
some task
errgroup tasks ended up with an error: some error

何时使用此模式

使用这种模式,其中组中的所有 goroutines 都必须成功完成而不会出错。


例如,如果我需要使用组合数据执行计算。如果特定表中的数据丢失(例如,尚未保存),我将无法执行计算。在本例中,我返回有关缺失数据的自定义错误,并且所有其他数据库查询的执行将中断,直到下一个任务组运行。


模式 3:工作线程池

工作池模式是一种允许任务并行化的模式,限制了同时执行的 goroutine 的数量。


在此模式中,我们创建固定数量的工作线程,用于等待队列中的任务。当任务出现时,该任务将添加到队列中。如果工作线程是空闲的,它会从队列中获取任务并执行它。任务执行的结果可以返回到主线程,在那里可以进行处理。

代码示例

让我们考虑工作线程池的变体之一。这里我们不会将任务执行的结果返回给主线程。

package main

import (
    "fmt"
    "sync"
    "time"
)

// Data to be proccessed.
var taskCount = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

const (
    // Number of concurrent workers.
    numberOfWorkers = 3
)

func main() {
    // Create buffered channel.
    jobs := make(chan struct{}, numberOfWorkers)
    wg := sync.WaitGroup{}

    // Add workers.
    for id := range taskCount {
        wg.Add(1)
        jobs <- struct{}{}

        go func(id int) {
            worker(id)
            <-jobs
            defer wg.Done()
        }(id)
    }

    // Wait for all workers to complete.
    wg.Wait()
}

func worker(id int) {
    fmt.Println(id)
    time.Sleep(2 * time.Second)
}

切片 taskCount 包含需要处理的数据(共 10 个元素)。在常量 numberOfWorkers 中,我们设置并发工作线程的数量——队列的大小。


接下来,我们创建一个缓冲区 struct{} 大小为 numberOfWorkers 的缓冲通道 jobs


为了等待所有任务完成,我们创建一个 WaitGroup .使用循环 for ,我们遍历 taskCount 切片中的所有数据,将 WaitGroup 计数器递增 1,并将任务添加到通道队列中 jobs 进行处理。


然后,通过 goroutine 调用我们的工作线程处理程序,该处理程序处理数据,然后使用 wg.Done() ,将 WaitGroup 计数器递减 1 从队列中删除任务。


使用 wg.Wait() ,我们等待所有 goroutines 完成,直到组计数器变为零。我们有 3 个 worker numberOfWorkers ,因此前三个 goroutine 将同时执行,在第四个任务中,作业 <-struct{}{} 行将被阻塞,直到其中一个 worker 完成其工作并从队列中 <-jobs 获取下一个项目。


何时使用此模式

当需要处理大量任务时,工作池模式很有用,但我们希望限制并发执行的 goroutine 的数量,这将对代码性能产生积极影响并避免系统过载。此模式还允许通过增加允许的并发工作线程数来轻松扩展系统。


工作线程池可用于在服务器端处理客户端请求或执行后台任务,例如报告生成或数据处理。


当需要在数据库中处理和保存大量数据行时,我经常使用此模式。此外,此模式对于处理来自分布式队列(如 Kafka)的事件非常方便。

结论

有许多并发模式,但在这里,我介绍了三种高级并发模式,人们可以在他们的工作中使用它们。如果有兴趣了解其他并发模式,阅读有关扇入、扇出、管道和桥接通道模式的信息。



【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。