k8s 中定时任务的实现

举报
技术火炬手 发表于 2019/02/19 17:16:30 2019/02/19
【摘要】 k8s 中有许多优秀的包都可以在平时的开发中借鉴与使用,比如,任务的定时轮询、高可用的实现、日志处理、缓存使用等都是独立的包,可以直接引用。

k8s 中有许多优秀的包都可以在平时的开发中借鉴与使用,比如,任务的定时轮询、高可用的实现、日志处理、缓存使用等都是独立的包,可以直接引用。本篇文章会介绍 k8s 中定时任务的实现,k8s 中定时任务都是通过 wait 包实现的,wait 包在 k8s 的多个组件中都有用到,以下是 wait 包在 kubelet 中的几处使用:

func run(s options.KubeletServer, kubeDeps kubelet.Dependencies, stopCh <-chan struct{}) (err error) {
...
// kubelet 每5分钟一次从 apiserver 获取证书
closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute)
if err != nil {
return err
}

    closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute)    if err != nil {        return err
    }
    ...

}

...

func startKubelet(k kubelet.Bootstrap, podCfg config.PodConfig, kubeCfg kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
// 持续监听 pod 的变化
go wait.Until(func() {
k.Run(podCfg.Updates())
}, 0, wait.NeverStop)
...
}
golang 中可以通过 time.Ticker 实现定时任务的执行,但在 k8s 中用了更原生的方式,使用 time.Timer 实现的。time.Ticker 和 time.Timer 的使用区别如下:

func (t *Timer) Reset(d Duration) bool
一个示例:

package main

import (
"fmt"
"sync"
"time"
)

func main() {
var wg sync.WaitGroup

timer1 := time.NewTimer(2 * time.Second)
ticker1 := time.NewTicker(2 * time.Second)

wg.Add(1)go func(t *time.Ticker) {
    defer wg.Done()    for {
        <-t.C
        fmt.Println("exec ticker", time.Now().Format("2006-01-02 15:04:05"))
    }
}(ticker1)

wg.Add(1)go func(t *time.Timer) {
    defer wg.Done()    for {
        <-t.C
        fmt.Println("exec timer", time.Now().Format("2006-01-02 15:04:05"))
        t.Reset(2 * time.Second)
    }
}(timer1)

wg.Wait()

}
一、wait 包中的核心代码
核心代码(k8s.io/apimachinery/pkg/util/wait/wait.go):

func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
var t *time.Timer
var sawTimeout bool

for {    select {    case <-stopCh:        return
    default:
    }

    jitteredPeriod := period    if jitterFactor > 0.0 {
        jitteredPeriod = Jitter(period, jitterFactor)
    }    if !sliding {
        t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
    }

    func() {
        defer runtime.HandleCrash()
        f()
    }()    if sliding {
        t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
    }    select {    case <-stopCh:        return
    case <-t.C:
        sawTimeout = true
    }
}

}

...

func resetOrReuseTimer(t time.Timer, d time.Duration, sawTimeout bool) time.Timer {
if t == nil {
return time.NewTimer(d)
}
if !t.Stop() && !sawTimeout {
<-t.C
}
t.Reset(d)
return t
}
几个关键点的说明:

1、如果 sliding 为 true,则在 f() 运行之后计算周期。如果为 false,那么 period 包含 f() 的执行时间。
2、在 golang 中 select 没有优先级选择,为了避免额外执行 f(),在每次循环开始后会先判断 stopCh chan。
k8s 中 wait 包其实是对 time.Timer 做了一层封装实现。

二、wait 包常用的方法
1、定期执行一个函数,永不停止,可以使用 Forever 方法:
func Forever(f func(), period time.Duration)

2、在需要的时候停止循环,那么可以使用下面的方法,增加一个用于停止的 chan 即可,方法定义如下:

func Until(f func(), period time.Duration, stopCh <-chan struct{})

上面的第三个参数 stopCh 就是用于退出无限循环的标志,停止的时候我们 close 掉这个 chan 就可以了。

3、有时候,我们还会需要在运行前去检查先决条件,在条件满足的时候才去运行某一任务,这时候可以使用 Poll 方法:

func Poll(interval, timeout time.Duration, condition ConditionFunc)

这个函数会以 interval 为间隔,不断去检查 condition 条件是否为真,如果为真则可以继续后续处理;如果指定了 timeout 参数,则该函数也可以只常识指定的时间。

4、PollUntil 方法和上面的类似,但是没有 timeout 参数,多了一个 stopCh 参数,如下所示:
PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error

此外还有 PollImmediate 、 PollInfinite 和 PollImmediateInfinite 方法。

三、总结
本篇文章主要讲了 k8s 中定时任务的实现与对应包(wait)中方法的使用。通过阅读 k8s 的源代码,可以发现 k8s 中许多功能的实现也都是我们需要在平时工作中用的,其大部分包的性能都是经过大规模考验的,通过使用其相关的工具包不仅能学到大量的编程技巧也能避免自己造轮子。
加群链接:https://jq.qq.com/?_wv=1027&k=5dPqXGI

群内提供免费的架构资料还有:Java工程化、高性能及分布式、高性能、深入浅出。高架构。性能调优、Spring,MyBatis,Netty源码分析和大数据等多个知识点高级进阶干货的免费直播讲解 可以进来一起学习交流哦

直播课堂地址:https://ke.qq.com/course/260263?flowToken=1007014

-------------------------------

本文转自UMUTech博客51CTO博客

如需转载,请联系作者授权

原文链接:http://blog.51cto.com/14158311/2351242

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。