prometheus代码分析
prometheus代码分析
config
GlobalConfig
ScrapeInterval
ScrapeTimeout
EvaluationInterval
ExternalLabels
AlertingConfig
ScrapeConfig
RemoteWriteConfig
RemoteReadConfig 通过config.go中一下方法解析启动命令中的
config-files参数值
// LoadFile parses the given YAML file into a Config.
func LoadFile(filename string) (*Config, error) {
content, err := ioutil.ReadFile(filename)
if err != nil {
return nil, err
}
cfg, err := Load(string(content))
if err != nil {
return nil, errors.Wrapf(err, "parsing YAML file %s", filename)
}
resolveFilepaths(filepath.Dir(filename), cfg)
return cfg, nil
}promql
discovery
discovery\config\config.go定义所有可能出现的抓取配置mechanisms,并实现了Validate()方法
Kubernetes service discovery
// SDConfig is the configuration for Kubernetes service discovery.
type SDConfig struct {
APIServer config_util.URL `yaml:"api_server,omitempty"`
Role Role `yaml:"role"`
HTTPClientConfig config_util.HTTPClientConfig `yaml:",inline"`
NamespaceDiscovery NamespaceDiscovery `yaml:"namespaces,omitempty"`
}DNS based service discovery
// SDConfig is the configuration for DNS based service discovery.
type SDConfig struct {
Names []string `yaml:"names"`
RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"`
Type string `yaml:"type"`
Port int `yaml:"port"` // Ignored for SRV records
}Zookeeper based discovery
// NerveSDConfig is the configuration for AirBnB's Nerve in Zookeeper based discovery.
type NerveSDConfig struct {
Servers []string `yaml:"servers"`
Paths []string `yaml:"paths"`
Timeout model.Duration `yaml:"timeout,omitempty"`
}manager.go负责Discoverer instance管理和更新func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) errorApplyConfig删除所有正在运行的发现提供程序,并使用提供的配置启动新的提供程序。
func (m *Manager) startProvider(ctx context.Context, p *provider) {
level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs))
ctx, cancel := context.WithCancel(ctx)
updates := make(chan []*targetgroup.Group)
m.discoverCancel = append(m.discoverCancel, cancel)
go p.d.Run(ctx, updates)
go m.updater(ctx, p, updates)
}Discoverer Zookeeper
// Run implements the Discoverer interface.
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
defer func() {
for _, tc := range d.treeCaches {
tc.Stop()
}
// Drain event channel in case the treecache leaks goroutines otherwise.
for range d.updates {
}
d.conn.Close()
}()
for {
select {
case <-ctx.Done():
return
case event := <-d.updates:
tg := &targetgroup.Group{
Source: event.Path,
}
if event.Data != nil {
labelSet, err := d.parse(*event.Data, event.Path)
if err == nil {
tg.Targets = []model.LabelSet{labelSet}
d.sources[event.Path] = tg
} else {
delete(d.sources, event.Path)
}
} else {
delete(d.sources, event.Path)
}
select {
case <-ctx.Done():
return
case ch <- []*targetgroup.Group{tg}:
}
}
}
}target
在main.go中使用
scrapeManager.Run(discoveryManagerScrape.SyncCh())
将discovery\manager.go中发现的Group
// SyncCh returns a read only channel used by all the clients to receive target updates.
func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group {
return m.syncCh
}scrape.go 获取metric数据并写入tsdb targetScraper
storage
localStorage
localStorage = &tsdb.ReadyStorage{}remoteStorage
remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, cfg.localStoragePath, time.Duration(cfg.RemoteFlushDeadline))fanout.go读写代理器
template
创建新的TemplateExpander,使用数据填充html
// NewTemplateExpander returns a template expander ready to use.
func NewTemplateExpander(
ctx context.Context,
text string,
name string,
data interface{},
timestamp model.Time,
queryFunc QueryFunc,
externalURL *url.URL,
) *Expandergo html template使用 https://www.cnblogs.com/aresxin/p/GO-yu-yanhtml-mo-ban.html
util
notifier
启动流程 main.go
初始化组件
存储组件
notifier组件 内部维护一个告警队列,
alertmanager.notification-queue-capacity参数设置,默认10000discoveryManageScrape组件 发现指标采集服务
discoveryManageNotify组件 发现告警通知服务
scrapeManager组件 管理对指标的采集并将采集的指标存储到fanoutStorage中
queryengine组件 规则查询引擎
ruleManager 完成规则运算到告警发送的流程
web组件
组件更新: reloaders 定义了多个组件更新fun
reloaders := []func(cfg *config.Config) error{
remoteStorage.ApplyConfig,
webHandler.ApplyConfig,
// The Scrape and notifier managers need to reload before the Discovery manager as
// they need to read the most updated config when receiving the new targets list.
scrapeManager.ApplyConfig,
func(cfg *config.Config) error {
c := make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.ScrapeConfigs {
c[v.JobName] = v.ServiceDiscoveryConfig
}
return discoveryManagerScrape.ApplyConfig(c)
},
notifierManager.ApplyConfig,
func(cfg *config.Config) error {
c := make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.AlertingConfig.AlertmanagerConfigs {
// AlertmanagerConfigs doesn't hold an unique identifier so we use the config hash as the identifier.
b, err := json.Marshal(v)
if err != nil {
return err
}
c[fmt.Sprintf("%x", md5.Sum(b))] = v.ServiceDiscoveryConfig
}
return discoveryManagerNotify.ApplyConfig(c)
},
func(cfg *config.Config) error {
// Get all rule files matching the configuration paths.
var files []string
for _, pat := range cfg.RuleFiles {
fs, err := filepath.Glob(pat)
if err != nil {
// The only error can be a bad pattern.
return errors.Wrapf(err, "error retrieving rule files for %s", pat)
}
files = append(files, fs...)
}
return ruleManager.Update(
time.Duration(cfg.GlobalConfig.EvaluationInterval),
files,
cfg.GlobalConfig.ExternalLabels,
)
},
}组件服务启动: 首先组件的启动使用github.com/oklog/run中Add添加,添加完成后使用Run方法运行
func (g *Group) Add(execute func() error, interrupt func(error)) {
g.actors = append(g.actors, actor{execute, interrupt})
}
func (g *Group) Run() error {
if len(g.actors) == 0 {
return nil
}
// Run each actor.
errors := make(chan error, len(g.actors))
for _, a := range g.actors {
go func(a actor) {
errors <- a.execute()
}(a)
}
// Wait for the first actor to stop.
err := <-errors
// Signal all actors to stop.
for _, a := range g.actors {
a.interrupt(err)
}
// Wait for all actors to stop.
for i := 1; i < cap(errors); i++ {
<-errors
}
// Return the original error.
return err
}添加顺序为:
Termination handler.
discoveryManagerScrape.Run()discoveryManagerNotify.Run()scrapeManager.Run(discoveryManagerScrape.SyncCh())Reload handler.
Initial configuration loading.
ruleManager.Run()TSDB.
webHandler.Run(ctxWeb)notifierManager.Run(discoveryManagerNotify.SyncCh())
- 点赞
- 收藏
- 关注作者
评论(0)