prometheus代码分析

举报
涩涩 发表于 2019/12/27 17:41:24 2019/12/27
【摘要】 prometheus代码分析configGlobalConfigScrapeIntervalScrapeTimeoutEvaluationIntervalExternalLabelsAlertingConfigScrapeConfigRemoteWriteConfigRemoteReadConfig 通过config.go中一下方法解析启动命令中的config-files参数值// Load...

prometheus代码分析

config

  • GlobalConfig

    • ScrapeInterval

    • ScrapeTimeout

    • EvaluationInterval

    • ExternalLabels

  • AlertingConfig

  • ScrapeConfig

  • RemoteWriteConfig

  • RemoteReadConfig 通过config.go中一下方法解析启动命令中的config-files参数值

// LoadFile parses the given YAML file into a Config.
   func LoadFile(filename string) (*Config, error) {
   	content, err := ioutil.ReadFile(filename)
   	if err != nil {
   		return nil, err
   	}
   	cfg, err := Load(string(content))
   	if err != nil {
   		return nil, errors.Wrapf(err, "parsing YAML file %s", filename)
   	}
   	resolveFilepaths(filepath.Dir(filename), cfg)
   	return cfg, nil
   }

promql

discovery

discovery\config\config.go定义所有可能出现的抓取配置mechanisms,并实现了Validate()方法

  • Kubernetes service discovery

// SDConfig is the configuration for Kubernetes service discovery.
type SDConfig struct {
	APIServer          config_util.URL              `yaml:"api_server,omitempty"`
	Role               Role                         `yaml:"role"`
	HTTPClientConfig   config_util.HTTPClientConfig `yaml:",inline"`
	NamespaceDiscovery NamespaceDiscovery           `yaml:"namespaces,omitempty"`
}
  • DNS based service discovery

// SDConfig is the configuration for DNS based service discovery.
type SDConfig struct {
	Names           []string       `yaml:"names"`
	RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"`
	Type            string         `yaml:"type"`
	Port            int            `yaml:"port"` // Ignored for SRV records
}
  • Zookeeper based discovery

// NerveSDConfig is the configuration for AirBnB's Nerve in Zookeeper based discovery.
type NerveSDConfig struct {
	Servers []string       `yaml:"servers"`
	Paths   []string       `yaml:"paths"`
	Timeout model.Duration `yaml:"timeout,omitempty"`
}

manager.go负责Discoverer instance管理和更新
func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) errorApplyConfig删除所有正在运行的发现提供程序,并使用提供的配置启动新的提供程序。

func (m *Manager) startProvider(ctx context.Context, p *provider) {
	level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs))
	ctx, cancel := context.WithCancel(ctx)
	updates := make(chan []*targetgroup.Group)

	m.discoverCancel = append(m.discoverCancel, cancel)

	go p.d.Run(ctx, updates)
	go m.updater(ctx, p, updates)
}
  • Discoverer Zookeeper

// Run implements the Discoverer interface.
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
	defer func() {
		for _, tc := range d.treeCaches {
			tc.Stop()
		}
		// Drain event channel in case the treecache leaks goroutines otherwise.
		for range d.updates {
		}
		d.conn.Close()
	}()

	for {
		select {
		case <-ctx.Done():
			return
		case event := <-d.updates:
			tg := &targetgroup.Group{
				Source: event.Path,
			}
			if event.Data != nil {
				labelSet, err := d.parse(*event.Data, event.Path)
				if err == nil {
					tg.Targets = []model.LabelSet{labelSet}
					d.sources[event.Path] = tg
				} else {
					delete(d.sources, event.Path)
				}
			} else {
				delete(d.sources, event.Path)
			}
			select {
			case <-ctx.Done():
				return
			case ch <- []*targetgroup.Group{tg}:
			}
		}
	}
}

target

在main.go中使用

scrapeManager.Run(discoveryManagerScrape.SyncCh())

将discovery\manager.go中发现的Group

// SyncCh returns a read only channel used by all the clients to receive target updates.
func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group {
	return m.syncCh
}

scrape.go 获取metric数据并写入tsdb targetScraper

storage

  • localStorage localStorage = &tsdb.ReadyStorage{}

  • remoteStorage remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, cfg.localStoragePath, time.Duration(cfg.RemoteFlushDeadline))
    fanout.go读写代理器

template

创建新的TemplateExpander,使用数据填充html

// NewTemplateExpander returns a template expander ready to use.
func NewTemplateExpander(
	ctx context.Context,
	text string,
	name string,
	data interface{},
	timestamp model.Time,
	queryFunc QueryFunc,
	externalURL *url.URL,
) *Expander

go html template使用 https://www.cnblogs.com/aresxin/p/GO-yu-yanhtml-mo-ban.html

util

notifier

启动流程 main.go

初始化组件

  1. 存储组件

  2. notifier组件 内部维护一个告警队列,alertmanager.notification-queue-capacity 参数设置,默认10000

  3. discoveryManageScrape组件 发现指标采集服务

  4. discoveryManageNotify组件 发现告警通知服务

  5. scrapeManager组件 管理对指标的采集并将采集的指标存储到fanoutStorage中

  6. queryengine组件 规则查询引擎

  7. ruleManager 完成规则运算到告警发送的流程

  8. web组件
    组件更新: reloaders 定义了多个组件更新fun

reloaders := []func(cfg *config.Config) error{
		remoteStorage.ApplyConfig,
		webHandler.ApplyConfig,
		// The Scrape and notifier managers need to reload before the Discovery manager as
		// they need to read the most updated config when receiving the new targets list.
		scrapeManager.ApplyConfig,
		func(cfg *config.Config) error {
			c := make(map[string]sd_config.ServiceDiscoveryConfig)
			for _, v := range cfg.ScrapeConfigs {
				c[v.JobName] = v.ServiceDiscoveryConfig
			}
			return discoveryManagerScrape.ApplyConfig(c)
		},
		notifierManager.ApplyConfig,
		func(cfg *config.Config) error {
			c := make(map[string]sd_config.ServiceDiscoveryConfig)
			for _, v := range cfg.AlertingConfig.AlertmanagerConfigs {
				// AlertmanagerConfigs doesn't hold an unique identifier so we use the config hash as the identifier.
				b, err := json.Marshal(v)
				if err != nil {
					return err
				}
				c[fmt.Sprintf("%x", md5.Sum(b))] = v.ServiceDiscoveryConfig
			}
			return discoveryManagerNotify.ApplyConfig(c)
		},
		func(cfg *config.Config) error {
			// Get all rule files matching the configuration paths.
			var files []string
			for _, pat := range cfg.RuleFiles {
				fs, err := filepath.Glob(pat)
				if err != nil {
					// The only error can be a bad pattern.
					return errors.Wrapf(err, "error retrieving rule files for %s", pat)
				}
				files = append(files, fs...)
			}
			return ruleManager.Update(
				time.Duration(cfg.GlobalConfig.EvaluationInterval),
				files,
				cfg.GlobalConfig.ExternalLabels,
			)
		},
	}

组件服务启动: 首先组件的启动使用github.com/oklog/runAdd添加,添加完成后使用Run方法运行

func (g *Group) Add(execute func() error, interrupt func(error)) {
    g.actors = append(g.actors, actor{execute, interrupt})
}

func (g *Group) Run() error {
   if len(g.actors) == 0 {
       return nil
   }

   // Run each actor.
   errors := make(chan error, len(g.actors))
   for _, a := range g.actors {
       go func(a actor) {
           errors <- a.execute()
       }(a)
   }

   // Wait for the first actor to stop.
   err := <-errors

   // Signal all actors to stop.
   for _, a := range g.actors {
       a.interrupt(err)
   }

   // Wait for all actors to stop.
   for i := 1; i < cap(errors); i++ {
       <-errors
   }

   // Return the original error.
   return err
}

添加顺序为:

  1. Termination handler.

  2. discoveryManagerScrape.Run()

  3. discoveryManagerNotify.Run()

  4. scrapeManager.Run(discoveryManagerScrape.SyncCh())

  5. Reload handler.

  6. Initial configuration loading.

  7. ruleManager.Run()

  8. TSDB.

  9. webHandler.Run(ctxWeb)

  10. notifierManager.Run(discoveryManagerNotify.SyncCh())


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。