Ceilometer源码分析
Ceilometer总体框架
Ceilometer组件的服务主要用于数据采集与告警的评估上报,其所包含的服务均为这两个功能所涉及,此外ceilometer提供REST的API,用于上层运维组件的调用。其中,ceilometer的agent服务用于主动采集原始数据,notification服务用于从消息队列接收其他组件发送过来的事件消息,ceilometer的collector服务用于将采集的收集从消息队列中收集下来,ceilometer的alarm服务用于评估和上报告警。
Ceilometer中数据的流向如图所示:
1、Ceilometer的agent服务主动调用其他组件API,或者libvirt API,或者hostos命令获取原始数据,并将其发往消息队列;
2、Ceilometer的notification服务通过监听消息队列对应通道,获取其他组件发送给ceilometer的事件消息
3、这些数据最终均会由ceilometer的collector服务收集并通过数据库存储下来
源码分析(获取原始数据部分)
1、代码入口:ceilometer agent compute的代码入口在ceilometer.cmd.polling中,创建服务
(): service.prepare_service() os_service.launch(CONFmanager.AgentManager(CONF.polling_namespacesCONF.pollster_list)).wait()
2: 读取pipeline、创建轮询任务
.polling_manager = pipeline.setup_polling() .partition_coordinator.start() .join_partitioning_groups() .pollster_timers = .configure_polling_tasks() .init_pipeline_refresh()
计算节点调用ceilometer/agent/manage.py中的AgentManager类的start方法,
start方法调用configure_polling_tasks方法,
data = self.setup_polling_tasks()
configure_polling_tasks方法调用setup_polling_tasks方法来读取pipeline文件来建立各个监控项的轮询任务,
polling_task = self.create_polling_task()
setup_polling_tasks方法调用create_polling_task来创建轮询任务PollingTask对象,轮询任务会调用PollingTask类的poll_and_notify方法
具体实现如下
def poll_and_notify(self): """Polling sample and notify.""" cache = {} discovery_cache = {} poll_history = {} for source_name in self.pollster_matches: for pollster in self.pollster_matches[source_name]: key = Resources.key(source_name, pollster) candidate_res = list( self.resources[key].get(discovery_cache)) if not candidate_res and pollster.obj.default_discovery: candidate_res = self.manager.discover( [pollster.obj.default_discovery], discovery_cache) # Remove duplicated resources and black resources. Using # set() requires well defined __hash__ for each resource. # Since __eq__ is defined, 'not in' is safe here. polling_resources = [] black_res = self.resources[key].blacklist history = poll_history.get(pollster.name, []) for x in candidate_res: if x not in history: history.append(x) if x not in black_res: polling_resources.append(x) poll_history[pollster.name] = history # If no resources, skip for this pollster if not polling_resources: p_context = 'new ' if history else '' LOG.info(_("Skip pollster %(name)s, no %(p_context)s" "resources found this cycle"), {'name': pollster.name, 'p_context': p_context}) continue LOG.info(_("Polling pollster %(poll)s in the context of " "%(src)s"), dict(poll=pollster.name, src=source_name)) try: samples = pollster.obj.get_samples( manager=self.manager, cache=cache, resources=polling_resources ) sample_batch = [] # filter None in samples samples = [s for s in samples if s is not None] for sample in samples: sample_dict = ( publisher_utils.meter_message_from_counter( sample, self._telemetry_secret )) if self._batch: sample_batch.append(sample_dict) else: self._send_notification([sample_dict]) if sample_batch: self._send_notification(sample_batch) except plugin_base.PollsterPermanentError as err: LOG.error(_( 'Prevent pollster %(name)s for ' 'polling source %(source)s anymore!') % ({'name': pollster.name, 'source': source_name})) self.resources[key].blacklist.extend(err.fail_res_list) except Exception as err: LOG.warning(_( 'Continue after error from %(name)s: %(error)s') % ({'name': pollster.name, 'error': err}), exc_info=True)
关键是获得samples:
samples = pollster.obj.get_samples( manager=self.manager, cache=cache, resources=polling_resources )
通过不同的监控项插件,轮询pipeline中的监控指标,获得对应的sample值,最后调用PollingTask的self._send_notification(sample_batch)来将收集到的监控项的数据进行发送到消息队列中。
def _send_notification(self, samples): LOG.debug("Send sample:%s" % samples) self.manager.notifier.sample( self.manager.context.to_dict(), 'telemetry.polling', {'samples': samples} )
通过消息队列的形式,以oslo.messaging对象的sample方法来将监控数据发送到消息队列,发送的数据就是一个字典{'samples': samples}
- 点赞
- 收藏
- 关注作者
评论(0)