协程任务执行框架示例
1 简介
通用 Goroutine 执行框架,本文将举例实现一个示例,并说明 在不同业务场景下如何使用和裁剪它。

本文这个框架的设计目标是:
统一并发执行 + 统一错误判断 + 可取消 + 可扩展
2 通用 Goroutine 执行框架(核心版)
- 1 设计目标
✔ 支持多个 goroutine 并发执行
✔ 统一收集错误
✔ 支持 context 超时 / 取消
✔ 可限制并发数
✔ 易扩展(结果、日志、监控)
-
2 核心结构定义
package executor import ( "context" "sync" "golang.org/x/sync/errgroup" ) type Task func(ctx context.Context) error type Executor struct { concurrency int } -
Executor 构造器
func NewExecutor(concurrency int) *Executor { if concurrency <= 0 { concurrency = 1 } return &Executor{ concurrency: concurrency, } } -
执行入口(核心方法)
func (e *Executor) Run(ctx context.Context, tasks []Task) error { g, ctx := errgroup.WithContext(ctx) sem := make(chan struct{}, e.concurrency) for _, task := range tasks { task := task g.Go(func() error { // 并发控制 sem <- struct{}{} defer func() { <-sem }() // 执行任务 return task(ctx) }) } return g.Wait() }
3 框架解决的问题
- 并发执行
多个任务同时执行
并发数可控(防止打爆 CPU / DB / 下游)
- 错误判断
任意 task 返回 error
Run() 直接返回错误
- 取消机制
任一任务失败 → ctx 自动 cancel
其余任务感知到 ctx.Done()
4 不同业务场景如何使用它
- 场景一:HTTP 并发调用多个下游服务
示例:一个接口需要并发调用 3 个 RPC
func Handler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 2*time.Second)
defer cancel()
executor := executor.NewExecutor(3)
tasks := []executor.Task{
func(ctx context.Context) error {
return callUserService(ctx)
},
func(ctx context.Context) error {
return callOrderService(ctx)
},
func(ctx context.Context) error {
return callPayService(ctx)
},
}
if err := executor.Run(ctx, tasks); err != nil {
http.Error(w, err.Error(), 500)
return
}
w.Write([]byte("success"))
}
- 适用点:
请求级并发
统一超时
快速失败
5 批量任务处理(如消息消费)
示例:批量处理 MQ 消息
func Consume(messages []Message) error {
ctx := context.Background()
executor := executor.NewExecutor(10)
tasks := make([]executor.Task, 0, len(messages))
for _, msg := range messages {
msg := msg
tasks = append(tasks, func(ctx context.Context) error {
return handleMessage(ctx, msg)
})
}
return executor.Run(ctx, tasks)
}
- 适用点:
高吞吐
并发可控
任一失败可整体失败或重试
6 部分失败可接受(需要改造)
如果你的业务是:
允许部分任务失败
你可以改造 Task 返回值:
type TaskResult struct {
Err error
}
或者:
func (e *Executor) RunWithResult(
ctx context.Context,
tasks []Task,
) []error {
errCh := make(chan error, len(tasks))
var wg sync.WaitGroup
sem := make(chan struct{}, e.concurrency)
for _, task := range tasks {
wg.Add(1)
go func(task Task) {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
errCh <- task(ctx)
}(task)
}
wg.Wait()
close(errCh)
var errs []error
for err := range errCh {
if err != nil {
errs = append(errs, err)
}
}
return errs
}
-
定时任务 / 后台任务
func CronJob() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel() executor := executor.NewExecutor(5) tasks := []executor.Task{ syncData, cleanCache, rebuildIndex, } if err := executor.Run(ctx, tasks); err != nil { log.Error("cron failed:", err) } }
6 完整增强版
- 加任务名 + 日志
type NamedTask struct {
Name string
Task Task
}
- panic 保护
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf(“panic: %v”, r)
}
}()
- metrics 统计
task_success_total
task_fail_total
task_duration
7、小结
以下场景不要使用该框架
- 需要任务之间有严格顺序
- goroutine 生命周期需要长期存在(worker pool)
- 需要返回大量结果流(适合 channel pipeline)
关键实现点
goroutine 不做判断,Task 返回 error
Executor 只负责并发和生命周期
context 是取消和正确性的唯一信号
- 点赞
- 收藏
- 关注作者
评论(0)