Ceilometer源码分析

举报
leixuewei 发表于 2020/02/28 12:10:56 2020/02/28
【摘要】 Ceilometer总体框架Ceilometer组件的服务主要用于数据采集与告警的评估上报,其所包含的服务均为这两个功能所涉及,此外ceilometer提供REST的API,用于上层运维组件的调用。其中,ceilometer的agent服务用于主动采集原始数据,notification服务用于从消息队列接收其他组件发送过来的事件消息,ceilometer的collector服务用于将采集的收...

Ceilometer总体框架


1582857928564994.png

Ceilometer组件的服务主要用于数据采集与告警的评估上报,其所包含的服务均为这两个功能所涉及,此外ceilometer提供RESTAPI,用于上层运维组件的调用。其中,ceilometeragent服务用于主动采集原始数据,notification服务用于从消息队列接收其他组件发送过来的事件消息,ceilometercollector服务用于将采集的收集从消息队列中收集下来,ceilometeralarm服务用于评估和上报告警。

Ceilometer中数据的流向如图所示:

1582857979886586.png

1Ceilometeragent服务主动调用其他组件API,或者libvirt API,或者hostos命令获取原始数据,并将其发往消息队列;

2Ceilometernotification服务通过监听消息队列对应通道,获取其他组件发送给ceilometer的事件消息

3、这些数据最终均会由ceilometercollector服务收集并通过数据库存储下来

源码分析(获取原始数据部分)

1、代码入口:ceilometer agent compute的代码入口在ceilometer.cmd.polling中,创建服务

():
    service.prepare_service()
    os_service.launch(CONFmanager.AgentManager(CONF.polling_namespacesCONF.pollster_list)).wait()

2: 读取pipeline、创建轮询任务

    .polling_manager = pipeline.setup_polling()

    .partition_coordinator.start()
    .join_partitioning_groups()

    .pollster_timers = .configure_polling_tasks()

    .init_pipeline_refresh()

计算节点调用ceilometer/agent/manage.py中的AgentManager类的start方法,

start方法调用configure_polling_tasks方法,

data = self.setup_polling_tasks()

configure_polling_tasks方法调用setup_polling_tasks方法来读取pipeline文件来建立各个监控项的轮询任务,

polling_task = self.create_polling_task()

setup_polling_tasks方法调用create_polling_task来创建轮询任务PollingTask对象,轮询任务会调用PollingTask类的poll_and_notify方法

具体实现如下

def poll_and_notify(self):
        """Polling sample and notify."""
        cache = {}
        discovery_cache = {}
        poll_history = {}
        for source_name in self.pollster_matches:
            for pollster in self.pollster_matches[source_name]:
                key = Resources.key(source_name, pollster)
                candidate_res = list(
                    self.resources[key].get(discovery_cache))
                if not candidate_res and pollster.obj.default_discovery:
                    candidate_res = self.manager.discover(
                        [pollster.obj.default_discovery], discovery_cache)
 
                # Remove duplicated resources and black resources. Using
                # set() requires well defined __hash__ for each resource.
                # Since __eq__ is defined, 'not in' is safe here.
                polling_resources = []
                black_res = self.resources[key].blacklist
                history = poll_history.get(pollster.name, [])
                for x in candidate_res:
                    if x not in history:
                        history.append(x)
                        if x not in black_res:
                            polling_resources.append(x)
                poll_history[pollster.name] = history
 
                # If no resources, skip for this pollster
                if not polling_resources:
                    p_context = 'new ' if history else ''
                    LOG.info(_("Skip pollster %(name)s, no %(p_context)s"
                               "resources found this cycle"),
                             {'name': pollster.name, 'p_context': p_context})
                    continue
 
                LOG.info(_("Polling pollster %(poll)s in the context of "
                           "%(src)s"),
                         dict(poll=pollster.name, src=source_name))
                try:
                    samples = pollster.obj.get_samples(
                        manager=self.manager,
                        cache=cache,
                        resources=polling_resources
                    )
                    sample_batch = []
 
                    # filter None in samples
                    samples = [s for s in samples if s is not None]
                    for sample in samples:
                        sample_dict = (
                            publisher_utils.meter_message_from_counter(
                                sample, self._telemetry_secret
                            ))
                        if self._batch:
                            sample_batch.append(sample_dict)
                        else:
                            self._send_notification([sample_dict])
                    if sample_batch:
                        self._send_notification(sample_batch)
 
                except plugin_base.PollsterPermanentError as err:
                    LOG.error(_(
                        'Prevent pollster %(name)s for '
                        'polling source %(source)s anymore!')
                        % ({'name': pollster.name, 'source': source_name}))
                    self.resources[key].blacklist.extend(err.fail_res_list)
                except Exception as err:
                    LOG.warning(_(
                        'Continue after error from %(name)s: %(error)s')
                        % ({'name': pollster.name, 'error': err}),
                        exc_info=True)

 关键是获得samples:

samples = pollster.obj.get_samples(
                        manager=self.manager,
                        cache=cache,
                        resources=polling_resources
                    )

通过不同的监控项插件,轮询pipeline中的监控指标,获得对应的sample值,最后调用PollingTaskself._send_notification(sample_batch)来将收集到的监控项的数据进行发送到消息队列中。

def _send_notification(self, samples):
    LOG.debug("Send sample:%s" % samples)
    self.manager.notifier.sample(
        self.manager.context.to_dict(),
        'telemetry.polling',
        {'samples': samples}
    )

通过消息队列的形式,以oslo.messaging对象的sample方法来将监控数据发送到消息队列,发送的数据就是一个字典{'samples': samples}







【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。