Informer 获取K8s event 保存至ES
一 背景
Informer 是 Client-go 中的一个核心工具包。在Kubernetes源码中,如果 Kubernetes 的某个组件,需要 List/Get Kubernetes 中的 Object,在绝大多 数情况下,会直接使用Informer实例中的Lister()方法(该方法包含 了 Get 和 List 方法),而很少直接请求Kubernetes API。Informer 最基本 的功能就是List/Get Kubernetes中的 Object。
二 Informer 机制
整个架构大体分为以下几个部分:
2.1 Index
tools/cache/thread_safe_store.go中,定义了实现了线程安全的存储接口ThreadSafeStore:
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string)
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error) //传入indexName和obj,返回所有和obj有相同index key的obj
IndexKeys(indexName, indexKey string) ([]string, error) // 传入indexName和index key,返回index key指定的所有obj key
ListIndexFuncValues(name string) []string //获取indexName对应的index内的所有index key
ByIndex(indexName, indexKey string) ([]interface{}, error) //和IndexKeys方法类似,只是返回的是index key指定的所有obj
GetIndexers() Indexers //返回目前所有的indexers
AddIndexers(newIndexers Indexers) error //存储数据前调用,添加indexer
Resync() error // Resync is a no-op and is deprecated
}
结构体threadSafeMap将资源对象数据存储于一个内存中的map数据结构中:
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{} //实际存所有资源对象的地方
indexers Indexers
indices Indices
}
每次的增、删、改、查操作都会都会加锁,以保证数据的一致性。
k8s.io/client-go/tools/cache/store.go中,定义了存储接口Store:
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
Replace([]interface{}, string) error
Resync() error
}
在tools/cache/index.go中,定义了Indexer接口:
type Indexer interface {
Store // 继承接口Store
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexedValue string) ([]string, error)
ListIndexFuncValues(indexName string) []string
ByIndex(indexName, indexedValue string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
}
还定义了一些数据结构:
type IndexFunc func(obj interface{}) ([]string, error) // 计算资源对象的index key的函数类型,值得注意的是,返回的是多个index key组成的列表
type Indexers map[string]IndexFunc // 计算索引的方法不止一个,通过给他们命名来加以区别,存储索引名(name)与索引方法(IndexFunc)的映射
type Indices map[string]Index // 索引名(name)与索引(index)的映射
type Index map[string]sets.String // 索引键(index key)与值(Obj Key)列表的映射
它们之间的关系如图:
具体实现在tools/cache/store.go中的cache结构体:
`type` `cache ``struct` `{ `` ``cacheStorage ThreadSafeStore ``//cacheStorage是一个ThreadSafeStore类型的对象,实际使用的是threadSafeMap类型`` ``keyFunc KeyFunc ``//用于计算资源对象的index key``}``type` `KeyFunc ``func``(obj ``interface``{}) (string, error)`
cache结构体封装了threadSafeMap的很多方法,对外提供了Add、Update、Delete等方法;Indexer接口中规定需要实现的那些方法都是调用的threadSafeMap的实现
通过cache.NewIndexer(keyFunc, Indexers)初始化Indexer对象
keyFunc:k8s内部目前使用的自定义的indexFunc有PodPVCIndexFunc 、indexByPodNodeName 、MetaNamespaceIndexFunc
默认使用MetaNamespaceKeyFunc:根据资源对象计算出<namespace>/<name>格式的key,如果资源对象的<namespace>为空,则<name>作为key
Indexers:通过NewThreadSafeStore(indexers, Indices{})得到结构体内的cacheStorage
// 定义一个IndexFunc,功能为:根据Annotations的users字段返回index key
func UsersIndexFunc(obj interface{}) ([]string, error){
pod := obj.(*v1.Pod)
usersString := pod.Annotations["users"]
return strings.Split(usersString, ","), nil
}
func main() {
index := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"byUser":UsersIndexFunc})
pod1 := &v1.Pod{ObjectMeta:metav1.ObjectMeta{Name:"one",Annotations:map[string]string{"users":"ernie,bert"}}}
pod2 := &v1.Pod{ObjectMeta:metav1.ObjectMeta{Name:"two",Annotations:map[string]string{"users":"bert,oscar"}}}
pod3 := &v1.Pod{ObjectMeta:metav1.ObjectMeta{Name:"three",Annotations:map[string]string{"users":"ernie,elmo"}}}
//添加3个Pod资源对象
index.Add(pod1)
index.Add(pod2)
index.Add(pod3)
//通过index.ByIndex函数(通过执行索引器函数得到索引结果)查询byUser索引器下匹配ernie字段的Pod列表
erniePods, err := index.ByIndex("byUser","ernie")
if err != nil{
panic(err)
}
for _, erniePods := range erniePods{
fmt.Println(erniePods.(*v1.Pod).Name)
}
}
2.2 DeltaFIFO
tools/cache/delta_fifo.go中定义了DeltaFIFO。Delta代表变化, FIFO则是先入先出的队列。
DeltaFIFO将接受来的资源event,转化为特定的变化类型,存储在队列中,周期性的POP出去,分发到事件处理器,并更新Indexer中的本地缓存。
DeltaType是string的别名,代表一种变化:
DeltaType是string的别名,代表一种变化:
`type` `DeltaType string`
类型定义:
`const` `(`` ``Added DeltaType = ``"Added"`` ``Updated DeltaType = ``"Updated"`` ``Deleted DeltaType = ``"Deleted"`` ``Replaced DeltaType = “Replaced” ``// 替换,list出错时,会触发relist,此时会替换`` ``Sync DeltaType = “Sync” ``// 周期性的同步,底层会当作一个update类型处理``)`
Delta由变化类型+资源对象组成:
type Delta struct {
Type DeltaType
Object interface{}
}
Deltas是[]delta切片:
type Deltas []Delta
DeltaFIFO的定义:
type DeltaFIFO struct {
lock sync.RWMutex //读写锁
cond sync.Cond //条件变量
items map[string]Deltas //通过map数据结构的方式存储,value存储的是对象的Deltas数组
queue []string //存储资源对象的key,该key通过KeyOf(obj)函数计算得到
populated bool //通过Replace()接口将第一批对象放入队列,或者第一次调用增、删、改接口时标记为true
initialPopulationCount int //通过Replace()接口将第一批对象放入队列,或者第一次调用增、删、改接口时标记为true
keyFunc KeyFunc
knownObjects KeyListerGetter //indexer
closed bool
closedLock sync.Mutex
emitDeltaTypeReplaced bool
}
向队列里添加元素:
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj) //获取obj key
if err != nil {
return KeyError{obj, err}
}
//向items中添加delta,并对操作进行去重,目前来看,只有连续两次操作都是删除操作的情况下,才可以合并,其他操作不会合并
newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {
//向queue和items中添加元素,添加以后,条件变量发出消息,通知可能正在阻塞的POP方法有事件进队列了
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
} else {
// 冗余判断,其实是不会走到这个分支的,去重后的delta list长度怎么也不可能小于1
delete(f.items, id)
}
return nil
}
从队列里Pop元素:
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 { // 如果队列是空的,利用条件变量阻塞住,直到有新的delta
if f.IsClosed() { // 如果Close()被调用,则退出
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
continue
}
delete(f.items, id)
err := process(item)
// 如果处理失败了,调用addIfNotPresent:如果queue中没有则添加。本身刚刚从queue和items中取出对象,应该不会存在重复的对象,这里调用addIfNotPresent应该只是为了保险起见
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}
2.3 Reflector
tools/cache/reflector.go中定义了Reflector:
type Reflector struct {
name string
expectedTypeName string //被监控的资源的类型名
expectedType reflect.Type // 监控的对象类型
expectedGVK *schema.GroupVersionKind
store Store // 存储,就是Delta_FIFO,这里的Store类型实际是Delta_FIFO的父类
listerWatcher ListerWatcher // 用来进行list&watch的接口对象
backoffManager wait.BackoffManager
resyncPeriod time.Duration //重新同步的周期
ShouldResync func() bool //周期性的判断是否需要重新同步
clock clock.Clock //时钟对象,主要是为了给测试留后门,方便修改时间
……
}
同一类资源Informer共享一个Reflector。Reflector通过ListAndWatch函数来ListAndWatch apiserver来获取资源的数据。
获取时需要基于ResourceVersion(Etcd生成的全局唯一且递增的资源版本号)。通过此序号,客户端可以知道目前与服务端信息同步的状态,每次只取大于等于本地序号的事件。好处是可以实现事件的全局唯一,实现”断点续传“功能,不用担心本地客户端偶尔出现的网络异常
ListAndwatch是k8s统一的异步消息处理机制,保证了消息的实时性、可靠性、顺序性、性能等,为声明式风格的API奠定了良好的基础,是k8s架构的精髓。
List在Controller重启或Watch中断的情况下,调用资源的list API罗列资源对象以进行全量更新,基于HTTP短链接实现
(1)r.listerWatcher.List用于获取资源下的所有对象的数据,例如,获取所有Pod的资源数据。获取资源数据是由options的ResourceVersion控制的。如果ResourceVersion为0,则表示获取所有Pod的资源数据;如果ResourceVersion非0,则表示根据资源版本号继续获取。
(2)listMetaInterface.GetResourceVersion用于获取资源版本号。
(3)meta.ExtractList用于将资源数据(runtime.Object对象)转换成资源对象列表([]runtime.Object对象)。
因为r.listerWatcher.List获取的是资源下的所有对象的数据,例如所有的Pod资源数据,所以它是一个资源列表。
(4)r.syncWith用于将资源对象列表中的资源对象和资源版本号存储至DeltaFIFO中,并会替换已存在的对象。
(5)r.setLastSyncResourceVersion用于设置最新的资源版本号。
Watch则在多次List之间进行,调用资源的watch API,基于当前的资源版本号监听资源变更(如Added、Updated、Deleted)事件。
通过在Http请求中带上watch=true,表示采用Http长连接持续监听apiserver发来的资源变更事件。
apiserver在response的HTTP Header中设置Transfer-Encoding的值为chunked,表示采用分块传输编码。每当有事件来临,返回一个WatchEvent。
Reflector在获取新的资源数据后,调用的Add方法将资源对象的Delta记录存放到本地缓存DeltaFIFO中。
2.4 Controller
在tool/cache/controller.go中定义了Controller接口:
type Controller interface {
Run(stopCh <-chan struct{})
HasSynced() bool
LastSyncResourceVersion() string
}
controller结构体实现了此接口:
type controller struct {
config Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock
}
config结构体中是所有配置:
type Config struct {
Queue //DeltaFIFO
ListerWatcher
Process ProcessFunc //从DeltaFIFO Pop调用时,调用的回调
ObjectType runtime.Object //期待处理的资源对象的类型
FullResyncPeriod time.Duration //全量resync的周期
ShouldResync ShouldResyncFunc //delta fifo周期性同步判断时使用
RetryOnError bool
}
Controller的processLoop方法会不断地调用的Pop方法从Delta队列中消费弹出delta记录(队列中没有数据时阻塞等待数据):
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
Pop方法须传入Process函数——用于接收并处理对象的回调方法,默认的Process函数是Informer模块中的HandleDeltas
2.5 informer
Kubernetes的其他组件都是通过client-go的Informer机制与Kubernetes API Server进行通信的。
Informer也被称为Shared Informer,它是可以共享使用的。
在clientgo的informer/factory.go中,有接口定义:
type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
// 所有已知资源的shared informer
Admissionregistration() admissionregistration.Interface
Apps() apps.Interface
Auditregistration() auditregistration.Interface
Autoscaling() autoscaling.Interface
Batch() batch.Interface
Certificates() certificates.Interface
Coordination() coordination.Interface
Core() core.Interface
Discovery() discovery.Interface
Events() events.Interface
Extensions() extensions.Interface
Flowcontrol() flowcontrol.Interface
Networking() networking.Interface
Node() node.Interface
Policy() policy.Interface
Rbac() rbac.Interface
Scheduling() scheduling.Interface
Settings() settings.Interface
Storage() storage.Interface
}
sharedInformerFactory结构体实现了此接口:
type sharedInformerFactory struct {
client kubernetes.Interface
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
lock sync.Mutex
defaultResync time.Duration
customResync map[reflect.Type]time.Duration
informers map[reflect.Type]cache.SharedIndexInformer
startedInformers map[reflect.Type]bool //用于追踪哪种informer被启动了,避免同一资源的Informer被实例化多次,运行过多相同的ListAndWatch
}
新建一个sharedInformerFactory结构体:
sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)
第1个参数是用于与Kubernetes API Server交互的客户端;第2个参数用于设置多久进行一次resync(周期性的List操作),如果该参数为0,则禁用resync功能。
sharedInformerFactory结构体实现了所有已知资源的shared informer,例如在clientgo的informer/core/vi/pod.go中,定义了如下接口:
type PodInformer interface{
Informer() cache.SharedIndexInformer
Listen() v1.PodLister
}
podInformer结构体实现了Informer方法和Listen方法:
func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer) // 如果已经存在同类型的资源Informer,则返回当前Informer,不再继续添加
}
func (f *podInformer) Lister() v1.PodLister {
return v1.NewPodLister(f.Informer().GetIndexer())
}
通过调用sharedInformers.Core().V1().Pods()获得podInformer结构体
得到具体Pod资源的informer对象:
informer := sharedInformers.Core().V1().Pods().Informer()
最终获得的,是clientgo/tool/cache/shared_informer.go中的sharedIndexInformer结构体,它实现的接口为:
type SharedIndexInformer interface {
SharedInformer
AddIndexers(indexers Indexers) error //启动informer前为其添加indexers
GetIndexer() Indexer
}
它的定义为:
type sharedIndexInformer struct {
indexer Indexer
controller Controller
processor *sharedProcessor
cacheMutationDetector MutationDetector
listerWatcher ListerWatcher
objectType runtime.Object
resyncCheckPeriod time.Duration
defaultEventHandlerResyncPeriod time.Duration
clock clock.Clock
started, stopped bool
startedLock sync.Mutex
blockDeltas sync.Mutex
}
通过informer.AddEventHandler函数可以为资源对象添加资源事件回调方法,支持3种资源事件回调方法:AddFunc、UpdateFunc、DeleteFunc
sharedIndexInformer结构体定义了HandleDeltas函数,作为process回调函数(通过Config结构体传给controller)
当资源对象的操作类型为Added、Updated、Deleted时,会将该资源对象存储至Indexer,并通过distribute函数将资源对象分发至用户自定义的事件处理函数(通过informer.AddEventHandler添加)中
通过informer.Run(stopCH)运行该informer,它是一个持久化的goroutine,通过clientset对象与apiserver交互。
它会启动controller,启动时传入的Config结构体包含了
stopCH用于在程序进程退出前通知Informer退出
调用Pod的Informer的示例:
stopCH := make(chan struct{})
defer close(stopCH)
sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)
informer := sharedInformers.Core().V1().Pods().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ //为Pod资源添加资源事件回调方法
AddFunc: func(obj interface{}){
mObj := obj.(v1.Object)
log.Print("创建新Pod:",mObj.GetName())
},
UpdateFunc: func(oldObj, newObj interface{}){
oObj := oldObj.(v1.Object)
nObj := newObj.(v1.Object)
log.Print(oObj.GetName(),",",nObj.GetName())
},
DeleteFunc: func(obj interface{}) {
mObj :=obj.(v1.Object)
log.Print("删除旧Pod:",mObj.GetName())
},
})
informer.Run(stopCH)
2.6 Work Queue(选用)
在开发并行程序时,需要频繁的进行数据同步,本身golang拥有channel 机制,但不能满足一些复杂场景的需求。例如:延时队列、限速队列。
client-go中提供了多种队列以供选择,可以胜任更多的场景。工作队列会对存储的对象进行去重,从而避免多个woker 处理同一个资源的情况。
用户可以在回调函数里,将资源对象推送到WorkQueue(或其他队列)中,也可以直接处理。
三 本地启动es
docker run --name es01 -d -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms512m -Xmx512m" elasticsearch:latest
# 使用head客户端链接
docker pull mobz/elasticsearch-head:5
# 启动header 容器
docker run -d --name my-es_admin -p 9100:9100 mobz/elasticsearch-head:5
启动后,es正常,为了更方便操作es,使用head插件来,插件链接异常,需要修改es配置,并重启容器
# 进入容器
$ docker exec -it es01 /bin/bash
# 进入容器后设置参数
# http.cors.enabled: true
# http.cors.allow-origin: "*"
echo 'http.cors.enabled: true' >> config/elasticsearch.yml
echo 'http.cors.allow-origin: "*"' >> config/elasticsearch.yml
# 设置完成,退出后重启容器
docker restart es01
修改配置重启后,可以已经可以通过head组件正常链接es集群
![image-20210927143317543](/Users/xuel/Library/Application Support/typora-user-images/image-20210927143317543.png)
创建索引:
三 代码
import (
"context"
"encoding/json"
"fmt"
"gopkg.in/olivere/elastic.v5"
"k8s.io/api/events/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"log"
"math/rand"
"os"
"time"
)
var client *elastic.Client
var host = "http://127.0.0.1:9200/"
//初始化
func init() {
errorlog := log.New(os.Stdout, "APP", log.LstdFlags)
var err error
// //这个地方有个小坑 不加上elastic.SetSniff(false) 会连接不上
client, err = elastic.NewClient(elastic.SetSniff(false), elastic.SetErrorLog(errorlog), elastic.SetURL(host))
if err != nil {
panic(err)
}
info, code, err := client.Ping(host).Do(context.Background())
if err != nil {
panic(err)
}
fmt.Printf("Elasticsearch returned with code %d and version %s\n", code, info.Version.Number)
esversion, err := client.ElasticsearchVersion(host)
if err != nil {
panic(err)
}
fmt.Printf("Elasticsearch version %s\n", esversion)
}
func mustSuccess(err error) {
if err != nil {
panic(err)
}
}
func main() {
rand.Seed(time.Now().UnixNano())
config, err := clientcmd.BuildConfigFromFlags("", "/Users/xuel/.kube/config")
mustSuccess(err)
clientset, err := kubernetes.NewForConfig(config)
mustSuccess(err)
sharedInformers := informers.NewSharedInformerFactory(clientset, 0)
stopChan := make(chan struct{})
defer close(stopChan)
eventInformer := sharedInformers.Events().V1beta1().Events().Informer()
addChan := make(chan v1beta1.Event)
deleteChan := make(chan v1beta1.Event)
eventInformer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
unstructObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
mustSuccess(err)
event := &v1beta1.Event{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj, event)
mustSuccess(err)
addChan <- *event
},
UpdateFunc: func(oldObj, newObj interface{}) {
},
DeleteFunc: func(obj interface{}) {
},
}, 0)
go func() {
for {
select {
case event := <-addChan:
str, err := json.Marshal(&event)
mustSuccess(err)
fmt.Printf("插入k8s事件内容:%s", string(str))
esinsert(str)
break
case <-deleteChan:
break
}
}
}()
eventInformer.Run(stopChan)
}
func esinsert(str []byte) {
index := "k8s_informer"
dbtype := "doc"
put1, err := client.Index().
Index(index).
Type(dbtype).
Id("1").BodyString(string(str)).
Do(context.Background())
if err != nil {
fmt.Println("Insert es error: %s", err)
}
fmt.Println("insert success", put1)
}
四 测试
插入数据后使用es查询:
curl -H "Content-Type: application/json" -XGET 'http://127.0.0.1:9200/k8s_informer/doc/_search?pretty' -d '{"query":{"match_all":{}}}'
触发k8s事件,会自动记录下来
参考链接
- 点赞
- 收藏
- 关注作者
评论(0)