prometheus代码分析
prometheus代码分析
config
GlobalConfig
ScrapeInterval
ScrapeTimeout
EvaluationInterval
ExternalLabels
AlertingConfig
ScrapeConfig
RemoteWriteConfig
RemoteReadConfig 通过config.go中一下方法解析启动命令中的
config-files
参数值
// LoadFile parses the given YAML file into a Config. func LoadFile(filename string) (*Config, error) { content, err := ioutil.ReadFile(filename) if err != nil { return nil, err } cfg, err := Load(string(content)) if err != nil { return nil, errors.Wrapf(err, "parsing YAML file %s", filename) } resolveFilepaths(filepath.Dir(filename), cfg) return cfg, nil }
promql
discovery
discovery\config\config.go
定义所有可能出现的抓取配置mechanisms,并实现了Validate()
方法
Kubernetes service discovery
// SDConfig is the configuration for Kubernetes service discovery. type SDConfig struct { APIServer config_util.URL `yaml:"api_server,omitempty"` Role Role `yaml:"role"` HTTPClientConfig config_util.HTTPClientConfig `yaml:",inline"` NamespaceDiscovery NamespaceDiscovery `yaml:"namespaces,omitempty"` }
DNS based service discovery
// SDConfig is the configuration for DNS based service discovery. type SDConfig struct { Names []string `yaml:"names"` RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"` Type string `yaml:"type"` Port int `yaml:"port"` // Ignored for SRV records }
Zookeeper based discovery
// NerveSDConfig is the configuration for AirBnB's Nerve in Zookeeper based discovery. type NerveSDConfig struct { Servers []string `yaml:"servers"` Paths []string `yaml:"paths"` Timeout model.Duration `yaml:"timeout,omitempty"` }
manager.go
负责Discoverer instance管理和更新func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) error
ApplyConfig删除所有正在运行的发现提供程序,并使用提供的配置启动新的提供程序。
func (m *Manager) startProvider(ctx context.Context, p *provider) { level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs)) ctx, cancel := context.WithCancel(ctx) updates := make(chan []*targetgroup.Group) m.discoverCancel = append(m.discoverCancel, cancel) go p.d.Run(ctx, updates) go m.updater(ctx, p, updates) }
Discoverer Zookeeper
// Run implements the Discoverer interface. func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { defer func() { for _, tc := range d.treeCaches { tc.Stop() } // Drain event channel in case the treecache leaks goroutines otherwise. for range d.updates { } d.conn.Close() }() for { select { case <-ctx.Done(): return case event := <-d.updates: tg := &targetgroup.Group{ Source: event.Path, } if event.Data != nil { labelSet, err := d.parse(*event.Data, event.Path) if err == nil { tg.Targets = []model.LabelSet{labelSet} d.sources[event.Path] = tg } else { delete(d.sources, event.Path) } } else { delete(d.sources, event.Path) } select { case <-ctx.Done(): return case ch <- []*targetgroup.Group{tg}: } } } }
target
在main.go中使用
scrapeManager.Run(discoveryManagerScrape.SyncCh())
将discovery\manager.go中发现的Group
// SyncCh returns a read only channel used by all the clients to receive target updates. func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group { return m.syncCh }
scrape.go
获取metric数据并写入tsdb targetScraper
storage
localStorage
localStorage = &tsdb.ReadyStorage{}
remoteStorage
remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, cfg.localStoragePath, time.Duration(cfg.RemoteFlushDeadline))
fanout.go
读写代理器
template
创建新的TemplateExpander,使用数据填充html
// NewTemplateExpander returns a template expander ready to use. func NewTemplateExpander( ctx context.Context, text string, name string, data interface{}, timestamp model.Time, queryFunc QueryFunc, externalURL *url.URL, ) *Expander
go html template使用 https://www.cnblogs.com/aresxin/p/GO-yu-yanhtml-mo-ban.html
util
notifier
启动流程 main.go
初始化组件
存储组件
notifier组件 内部维护一个告警队列,
alertmanager.notification-queue-capacity
参数设置,默认10000discoveryManageScrape组件 发现指标采集服务
discoveryManageNotify组件 发现告警通知服务
scrapeManager组件 管理对指标的采集并将采集的指标存储到fanoutStorage中
queryengine组件 规则查询引擎
ruleManager 完成规则运算到告警发送的流程
web组件
组件更新: reloaders 定义了多个组件更新fun
reloaders := []func(cfg *config.Config) error{ remoteStorage.ApplyConfig, webHandler.ApplyConfig, // The Scrape and notifier managers need to reload before the Discovery manager as // they need to read the most updated config when receiving the new targets list. scrapeManager.ApplyConfig, func(cfg *config.Config) error { c := make(map[string]sd_config.ServiceDiscoveryConfig) for _, v := range cfg.ScrapeConfigs { c[v.JobName] = v.ServiceDiscoveryConfig } return discoveryManagerScrape.ApplyConfig(c) }, notifierManager.ApplyConfig, func(cfg *config.Config) error { c := make(map[string]sd_config.ServiceDiscoveryConfig) for _, v := range cfg.AlertingConfig.AlertmanagerConfigs { // AlertmanagerConfigs doesn't hold an unique identifier so we use the config hash as the identifier. b, err := json.Marshal(v) if err != nil { return err } c[fmt.Sprintf("%x", md5.Sum(b))] = v.ServiceDiscoveryConfig } return discoveryManagerNotify.ApplyConfig(c) }, func(cfg *config.Config) error { // Get all rule files matching the configuration paths. var files []string for _, pat := range cfg.RuleFiles { fs, err := filepath.Glob(pat) if err != nil { // The only error can be a bad pattern. return errors.Wrapf(err, "error retrieving rule files for %s", pat) } files = append(files, fs...) } return ruleManager.Update( time.Duration(cfg.GlobalConfig.EvaluationInterval), files, cfg.GlobalConfig.ExternalLabels, ) }, }
组件服务启动: 首先组件的启动使用github.com/oklog/run
中Add
添加,添加完成后使用Run
方法运行
func (g *Group) Add(execute func() error, interrupt func(error)) { g.actors = append(g.actors, actor{execute, interrupt}) } func (g *Group) Run() error { if len(g.actors) == 0 { return nil } // Run each actor. errors := make(chan error, len(g.actors)) for _, a := range g.actors { go func(a actor) { errors <- a.execute() }(a) } // Wait for the first actor to stop. err := <-errors // Signal all actors to stop. for _, a := range g.actors { a.interrupt(err) } // Wait for all actors to stop. for i := 1; i < cap(errors); i++ { <-errors } // Return the original error. return err }
添加顺序为:
Termination handler.
discoveryManagerScrape.Run()
discoveryManagerNotify.Run()
scrapeManager.Run(discoveryManagerScrape.SyncCh())
Reload handler.
Initial configuration loading.
ruleManager.Run()
TSDB.
webHandler.Run(ctxWeb)
notifierManager.Run(discoveryManagerNotify.SyncCh())
- 点赞
- 收藏
- 关注作者
评论(0)