协程任务执行框架示例

举报
码乐 发表于 2026/01/22 10:38:26 2026/01/22
【摘要】 1 简介通用 Goroutine 执行框架,本文将举例实现一个示例,并说明 在不同业务场景下如何使用和裁剪它。本文这个框架的设计目标是: 统一并发执行 + 统一错误判断 + 可取消 + 可扩展 2 通用 Goroutine 执行框架(核心版)1 设计目标✔ 支持多个 goroutine 并发执行✔ 统一收集错误✔ 支持 context 超时 / 取消✔ 可限制并发数✔ 易扩展(结果、...

1 简介

通用 Goroutine 执行框架,本文将举例实现一个示例,并说明 在不同业务场景下如何使用和裁剪它。

本文这个框架的设计目标是:

				统一并发执行 + 统一错误判断 + 可取消 + 可扩展

2 通用 Goroutine 执行框架(核心版)

  • 1 设计目标

✔ 支持多个 goroutine 并发执行
✔ 统一收集错误
✔ 支持 context 超时 / 取消
✔ 可限制并发数
✔ 易扩展(结果、日志、监控)

  • 2 核心结构定义

      package executor
    
      import (
          "context"
          "sync"
    
          "golang.org/x/sync/errgroup"
      )
    
      type Task func(ctx context.Context) error
    
      type Executor struct {
          concurrency int
      }
    
  • Executor 构造器

    func NewExecutor(concurrency int) *Executor {
        if concurrency <= 0 {
            concurrency = 1
        }
        return &Executor{
            concurrency: concurrency,
        }
    }
    
  • 执行入口(核心方法)

      func (e *Executor) Run(ctx context.Context, tasks []Task) error {
          g, ctx := errgroup.WithContext(ctx)
    
          sem := make(chan struct{}, e.concurrency)
    
          for _, task := range tasks {
              task := task
    
              g.Go(func() error {
                  // 并发控制
                  sem <- struct{}{}
                  defer func() { <-sem }()
    
                  // 执行任务
                  return task(ctx)
              })
          }
    
          return g.Wait()
      }
    

3 框架解决的问题

  • 并发执行

多个任务同时执行

并发数可控(防止打爆 CPU / DB / 下游)

  • 错误判断

任意 task 返回 error

Run() 直接返回错误

  • 取消机制

任一任务失败 → ctx 自动 cancel

其余任务感知到 ctx.Done()

4 不同业务场景如何使用它

  • 场景一:HTTP 并发调用多个下游服务

示例:一个接口需要并发调用 3 个 RPC

    func Handler(w http.ResponseWriter, r *http.Request) {
        ctx, cancel := context.WithTimeout(r.Context(), 2*time.Second)
        defer cancel()

        executor := executor.NewExecutor(3)

        tasks := []executor.Task{
            func(ctx context.Context) error {
                return callUserService(ctx)
            },
            func(ctx context.Context) error {
                return callOrderService(ctx)
            },
            func(ctx context.Context) error {
                return callPayService(ctx)
            },
        }

        if err := executor.Run(ctx, tasks); err != nil {
            http.Error(w, err.Error(), 500)
            return
        }

        w.Write([]byte("success"))
    }
  • 适用点:

请求级并发

统一超时

快速失败

5 批量任务处理(如消息消费)

示例:批量处理 MQ 消息

    func Consume(messages []Message) error {
        ctx := context.Background()
        executor := executor.NewExecutor(10)

        tasks := make([]executor.Task, 0, len(messages))

        for _, msg := range messages {
            msg := msg
            tasks = append(tasks, func(ctx context.Context) error {
                return handleMessage(ctx, msg)
            })
        }

        return executor.Run(ctx, tasks)
    }
  • 适用点:

高吞吐

并发可控

任一失败可整体失败或重试

6 部分失败可接受(需要改造)

如果你的业务是:

允许部分任务失败

你可以改造 Task 返回值:

  type TaskResult struct {
      Err error
  }

或者:

    func (e *Executor) RunWithResult(
        ctx context.Context,
        tasks []Task,
    ) []error {
        errCh := make(chan error, len(tasks))
        var wg sync.WaitGroup

        sem := make(chan struct{}, e.concurrency)

        for _, task := range tasks {
            wg.Add(1)
            go func(task Task) {
                defer wg.Done()
                sem <- struct{}{}
                defer func() { <-sem }()
                errCh <- task(ctx)
            }(task)
        }

        wg.Wait()
        close(errCh)

        var errs []error
        for err := range errCh {
            if err != nil {
                errs = append(errs, err)
            }
        }
        return errs
    }
  • 定时任务 / 后台任务

      func CronJob() {
          ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
          defer cancel()
    
          executor := executor.NewExecutor(5)
    
          tasks := []executor.Task{
              syncData,
              cleanCache,
              rebuildIndex,
          }
    
          if err := executor.Run(ctx, tasks); err != nil {
              log.Error("cron failed:", err)
          }
      }
    

6 完整增强版

  • 加任务名 + 日志

type NamedTask struct {
Name string
Task Task
}

  • panic 保护

defer func() {
if r := recover(); r != nil {
err = fmt.Errorf(“panic: %v”, r)
}
}()

  • metrics 统计

task_success_total

task_fail_total

task_duration

7、小结

以下场景不要使用该框架

  • 需要任务之间有严格顺序
  • goroutine 生命周期需要长期存在(worker pool)
  • 需要返回大量结果流(适合 channel pipeline)

关键实现点

goroutine 不做判断,Task 返回 error
Executor 只负责并发和生命周期
context 是取消和正确性的唯一信号

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。