浅析golang中的context

举报
苏州程序大白 发表于 2022/03/21 13:56:58 2022/03/21
【摘要】 简单分析context库的源码以及使用1.context是什么go1.7才引入context,译作“上下文”,实际也叫goroutine 的上下文,包含 goroutine 的运行状态、环境、现场等信息、context 主要用来在 goroutine 之间传递上下文信息,包括:取消信号、超时时间、截止时间、k-v 等。与WaitGroup最大的不同点是context对于派生goroutine...

简单分析context库的源码以及使用

1.context是什么

go1.7才引入context,译作“上下文”,实际也叫goroutine 的上下文,包含 goroutine 的运行状态、环境、现场等信息、context 主要用来在 goroutine 之间传递上下文信息,包括:取消信号、超时时间、截止时间、k-v 等。与WaitGroup最大的不同点是context对于派生goroutine有更强的控制力,它可以控制多级的goroutine

随着 context 包的引入,标准库中很多接口加上了 context 参数,例如 database/sql 包、http包。context 几乎成为了并发控制和超时控制的标准做法,由于context的源码里用到了大量的mutex锁用于保护子级的context,所以它是并发安全的

2.context接口的实现

context接口只定义了4中方法

type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}
}
  • Deadline 返回此上下文完成的工作的截止时间,未设置截止日期时,返回 ok==false,对 Deadline 的连续调用返回相同的结果

  • Done 返回一个channel,可以表示 context 被取消的信号,这是一个只读的channel,当这个 channel 被关闭时,说明 context 被取消了,而且读一个关闭的 channel 会读出相应类型的零值(channel对应的零值是nil)。常用在select-case语句中,如case <-context.Done():

  • Err 描述context关闭的原因,由context实现控制,不需要用户设置,例如是被取消,还是超时,主动取消的就返回context canceled,因超时关闭就返回context deadline exceeded

  • Value 用于在树状分布的goroutine间传递信息,根据key值查询map中的value

3.实现context接口的几种结构体#

整体类图

3.1 emptyCtx

type emptyCtx int

context包中定义了一个空的context, 名为emptyCtx,用于context的根节点,空的context只是简单的实现了Context,本身不包含任何值,仅用于其他context的父节点

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
	return
}
func (*emptyCtx) Done() <-chan struct{} {
	return nil
}
func (*emptyCtx) Err() error {
	return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
	return nil
}
func (e *emptyCtx) String() string {
	switch e {
	case background:
		return "context.Background"
	case todo:
		return "context.TODO"
	}
	return "unknown empty Context"
}
var (
	background = new(emptyCtx)
	todo       = new(emptyCtx)
)
func Background() Context {
	return background
}
func TODO() Context {
	return todo
}

emptyCtx是一个int类型的变量,但实现了context的接口。emptyCtx没有超时时间,不能取消,也不能存储任何额外信息,所以emptyCtx用来作为context树的根节点

  • background 通常用在 main 函数中,作为所有 context 的根节点

  • todo 通常用在并不知道传递什么context的情形,相当于用 todo 占个位子,最终要换成其他 context

3.2 cancelCtx

这是一个可以取消的context

type canceler interface {
	cancel(removeFromParent bool, err error)
	Done() <-chan struct{}
}
type cancelCtx struct {
	Context
	mu       sync.Mutex           
	done     atomic.Value         
	children map[canceler]struct{}
	err      error                
}

cancelCtx将接口 Context 作为它的一个匿名字段,因此可以被看成是一个 Context,同时cancelCtx实现了 canceler 接口。children中记录了由此context派生的所有child,此context被cancel时会把其中的所有child都cancel掉,cancelCtx与deadline和value无关

func (c *cancelCtx) Done() <-chan struct{} {
	d := c.done.Load()
	if d != nil {
		return d.(chan struct{})
	}
	c.mu.Lock()
	defer c.mu.Unlock()
	d = c.done.Load()
	if d == nil {
		d = make(chan struct{})
		c.done.Store(d)
	}
	return d.(chan struct{})
}
func (c *cancelCtx) Err() error {
	c.mu.Lock()
	err := c.err
	c.mu.Unlock()
	return err
}
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
	if err == nil {
		panic("context: internal error: missing cancel error")
	}
	c.mu.Lock()
	if c.err != nil {
		c.mu.Unlock()
		return // already canceled
	}
	c.err = err
	d, _ := c.done.Load().(chan struct{})
	if d == nil {
		c.done.Store(closedchan)
	} else {
		close(d)
	}
	for child := range c.children {
		// NOTE: acquiring the child's lock while holding parent's lock.
		child.cancel(false, err)
	}
	c.children = nil
	c.mu.Unlock()

	if removeFromParent {
		removeChild(c.Context, c)
	}
}
  • Done 返回一个只读的channel,而且没有地方向这个 channel 里面写数据,所以直接读这个channel协程会被阻塞住,一般通过搭配 select 来使用,一旦关闭,就会立即读出对应类型的零值

  • Err 则是返回对应的错误类型

  • cancel 关闭 c.done,取消 c 的每个子节点,如果 removeFromParent 为真,则从其父节点的子节点中删除 c,总体来说就是删除自己和其后代,不会影响到父节点和其它分支的节点,这里删除子节点调用的是removeChild,可见调用了delete

    func removeChild(parent Context, child canceler) {
    	p, ok := parentCancelCtx(parent)
    	if !ok {
    		return
    	}
    	p.mu.Lock()
    	if p.children != nil {
    		delete(p.children, child)
    	}
    	p.mu.Unlock()
    }
    

有一个WithCancel方法,会暴露给写代码的人调用:

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	c := newCancelCtx(parent) //①
	propagateCancel(parent, &c) //②
	return &c, func() { c.cancel(true, Canceled) } //③
}

具体实现分三步:①初始化一个cancelCtx实例,②如果父节点也可以被cancel,将cancelCtx实例添加到其父节点的children中,③返回cancelCtx实例和cancel方法

第②步调用的函数propagateCancel代码逻辑可以细分为3步:

  • 如果父节点支持cancel,则父节点有children成员,可以把新context添加到children里
  • 如果父节点不支持cancel,继续向上查询直到找到一个支持cancel的节点,把新context添加到children里
  • 如果所有的父节点均不支持cancel,则启动一个协程等待父节点结束,然后再把当前context结束
func propagateCancel(parent Context, child canceler) {
   done := parent.Done()
   if done == nil {
      return // parent is never canceled
   }

   select {
   case <-done:
      // parent is already canceled
      child.cancel(false, parent.Err())
      return
   default:
   }

   if p, ok := parentCancelCtx(parent); ok {
      p.mu.Lock()
      if p.err != nil {
         // parent has already been canceled
         child.cancel(false, p.err)
      } else {
         if p.children == nil {
            p.children = make(map[canceler]struct{})
         }
         p.children[child] = struct{}{}
      }
      p.mu.Unlock()
   } else {
      atomic.AddInt32(&goroutines, +1)
      go func() {
         select {
         case <-parent.Done():
            child.cancel(false, parent.Err())
         case <-child.Done():
         }
      }()
   }
}

3.3 timerCtx

type timerCtx struct {
	cancelCtx
	timer *time.Timer 
	deadline time.Time
}
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
	return c.deadline, true
}
func (c *timerCtx) cancel(removeFromParent bool, err error) {
	c.cancelCtx.cancel(false, err)
	if removeFromParent {
		// Remove this timerCtx from its parent cancelCtx's children.
		removeChild(c.cancelCtx.Context, c)
	}
	c.mu.Lock()
	if c.timer != nil {
		c.timer.Stop()
		c.timer = nil
	}
	c.mu.Unlock()
}

timerCtx是基于cancelCtx的,此外多了timer和deadline,timer指最长存活时间,比如将在多少秒后结束,deadline表示最后期限,需要指定具体的截止日期,由此衍生出了WithDeadline()和WithTimeout()函数

3.4 vauleCtx

type valueCtx struct {
	Context
	key, val interface{}
}

valueCtx在Context基础上增加了一个key-value对,用于在各级协程间传递一些数据,valueCtx既不需要cancel,也不需要deadline,只需要实现Value()接口

func (c *valueCtx) Value(key interface{}) interface{} {
	if c.key == key {
		return c.val
	}
	return c.Context.Value(key)
}

因此有了WithVaule函数

func WithValue(parent Context, key, val interface{}) Context {
   if parent == nil {
      panic("cannot create context from nil parent")
   }
   if key == nil {
      panic("nil key")
   }
   if !reflectlite.TypeOf(key).Comparable() {
      panic("key is not comparable")
   }
   return &valueCtx{parent, key, val}
}

WithVaule可以用来设置键值对


从这些源代码中可以看出使用了大量的锁,保证了执行过程中的并发安全

4.context的使用

4.1 异步调用链

下面这个例子的流程图如下

假设一个例子,genGreeting在放弃调用locale之前等待一秒——超时时间为1秒,如果printGreeting不成功,就取消对printFare的调用

可以看到系统输出工作正常,由于local设置至少需要运行一分钟,因此genGreeting将始终超时,这意味着main会始终取消printFarewell下面的调用链,

func main() {
    var wg sync.WaitGroup
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    wg.Add(1)
    go func() {
        defer wg.Done()

        if err := printGreeting(ctx); err != nil {
            fmt.Printf("cannot print greeting: %v\n", err)
            cancel()
        }
    }()

    wg.Add(1)
    go func() {
        defer wg.Done()
        if err := printFarewell(ctx); err != nil {
            fmt.Printf("cannot print farewell: %v\n", err)
        }
    }()

    wg.Wait()
}

func printGreeting(ctx context.Context) error {
    greeting, err := genGreeting(ctx)
    if err != nil {
        return err
    }
    fmt.Printf("%s world!\n", greeting)
    return nil
}

func printFarewell(ctx context.Context) error {
    farewell, err := genFarewell(ctx)
    if err != nil {
        return err
    }
    fmt.Printf("%s world!\n", farewell)
    return nil
}

func genGreeting(ctx context.Context) (string, error) {
    ctx, cancel := context.WithTimeout(ctx, 1*time.Second) //只等1秒钟就取消
    defer cancel()

    switch locale, err := locale(ctx); {
    case err != nil:
        return "", err
    case locale == "EN/US":
        return "hello", nil
    }
    return "", fmt.Errorf("unsupported locale")
}

func genFarewell(ctx context.Context) (string, error) {
    switch locale, err := locale(ctx); {
    case err != nil:
        return "", err
    case locale == "EN/US":
        return "goodbye", nil
    }
    return "", fmt.Errorf("unsupported locale")
}

func locale(ctx context.Context) (string, error) {
    select {
    case <-ctx.Done():
        return "", ctx.Err() 
    case <-time.After(1 * time.Minute):	//等待一分钟后执行
    }
    return "EN/US", nil
}

//结果
cannot print greeting: context deadline exceeded
cannot print farewell: context canceled

可以在这个程序上进一步改进:因为已知locale需要大约一分钟的时间才能运行,所以可以在locale中检查是否给出了deadline,如果给出了,则返回一个context包预设的错误——DeadlineExceeded

可以看到最终结果是一样的,但是会马上得出执行结果,而不会被阻塞1秒钟

func main() {
    var wg sync.WaitGroup
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    wg.Add(1)
    go func() {
        defer wg.Done()

        if err := printGreeting(ctx); err != nil {
            fmt.Printf("cannot print greeting: %v\n", err)
            cancel()
        }
    }()
    wg.Add(1)
    go func() {
        defer wg.Done()
        if err := printFarewell(ctx); err != nil {
            fmt.Printf("cannot print farewell: %v\n", err)
        }
    }()

    wg.Wait()
}

func printGreeting(ctx context.Context) error {
    greeting, err := genGreeting(ctx)
    if err != nil {
        return err
    }
    fmt.Printf("%s world!\n", greeting)
    return nil
}

func printFarewell(ctx context.Context) error {
    farewell, err := genFarewell(ctx)
    if err != nil {
        return err
    }
    fmt.Printf("%s world!\n", farewell)
    return nil
}

func genGreeting(ctx context.Context) (string, error) {
    ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
    defer cancel()

    switch locale, err := locale(ctx); {
    case err != nil:
        return "", err
    case locale == "EN/US":
        return "hello", nil
    }
    return "", fmt.Errorf("unsupported locale")
}

func genFarewell(ctx context.Context) (string, error) {
    switch locale, err := locale(ctx); {
    case err != nil:
        return "", err
    case locale == "EN/US":
        return "goodbye", nil
    }
    return "", fmt.Errorf("unsupported locale")
}

func locale(ctx context.Context) (string, error) {
    if deadline, ok := ctx.Deadline(); ok { //1
        if deadline.Sub(time.Now().Add(1*time.Minute)) <= 0 {
            return "", context.DeadlineExceeded
        }
    }

    select {
    case <-ctx.Done():
        return "", ctx.Err()
    case <-time.After(1 * time.Minute):
    }
    return "EN/US", nil
}

//结果
cannot print greeting: context deadline exceeded
cannot print farewell: context canceled

4.2 协程取消信号同步

在并发程序中,由于超时、取消操作或者一些异常情况,往往需要进行抢占操作或者中断后续操作,如下例可以采用channel的方式控制,这里采用主协程main控制通道的关闭,子协程监听done,一旦主协程关闭了channel,那么子协程就可以退出了

因为这个例子还不复杂,所以用通道控制感觉还可以,但是当有多个主协程和多个子协程时,就要定义多个done channel,这将变得非常混乱

func main() {
   messages := make(chan int, 10)
   done := make(chan bool)

   defer close(messages)
   // consumer
   go func() {
      ticker := time.NewTicker(1 * time.Second)
      for _ = range ticker.C {
         select {
         case <-done:
            fmt.Println("child process interrupt...")
            return
         default:
            fmt.Printf("send message: %d\n", <-messages)
         }
      }
   }()

   // producer
   for i := 0; i < 10; i++ {
      messages <- i
   }
   time.Sleep(5 * time.Second)
   close(done)
   time.Sleep(1 * time.Second)
   fmt.Println("main process exit!")
}

//结果
send message: 0
send message: 1
send message: 2
send message: 3
send message: 4
child process interrupt...
main process exit!

可以试试用context改写解决这个问题,如下例,只要让子线程监听主线程传入的ctx,一旦ctx.Done()返回空channel,子线程即可取消执行任务

func main() {
	messages := make(chan int, 10)

	// producer
	for i := 0; i < 10; i++ {
		messages <- i
	}

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)

	// consumer
	go func(ctx context.Context) {
		ticker := time.NewTicker(1 * time.Second)
		for _ = range ticker.C {
			select {
			case <-ctx.Done():
				fmt.Println("child process interrupt...")
				return
			default:
				fmt.Printf("send message: %d\n", <-messages)
			}
		}
	}(ctx)

	defer close(messages)
	defer cancel()

	select {
	case <-ctx.Done():
		time.Sleep(1 * time.Second)
		fmt.Println("main process exit!")
	}
}

//结果
send message: 0
send message: 1
send message: 2
send message: 3
send message: 4
child process interrupt...
main process exit
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。