设为首页 收藏本站
查看: 1486|回复: 0

[经验分享] openstack 之 ceilometer: Notification

[复制链接]

尚未签到

发表于 2015-10-11 08:03:54 | 显示全部楼层 |阅读模式
openstack 之 ceilometer: Notification
  注:本文以ceilometer stable/kilo代码为例分析
ceilometer Notification实现架构
  ceilometer Notification即是从消息队列中获取消息并将获取的消息转换成event事件(部分Notification生成sample)。ceilometer获取并存储Notification的结构图如下 :

DSC0000.jpg
ceilometer获取的消息格式
  通常,从消息队列中获取的消息格式如下:


{
'_context_roles': ['Member'],
'_context_request_id': 'req-ee35abb9-8bfd-48fa-9213',
'_context_quota_class': None,
'event_type': u'compute.instance.create.start',
'_context_service_catalog': [{
'endpoint': [{
'adminURL': 'http://127.0.0.1:8774/v2/tenant_id',
'region': 'RegionOne',
'id': '28d096fd6bf545e3b5de8a482de06d2d',
'internalURL': 'http://127.0.0.1:8774/v2/tenant_id',
'publicURL': 'http://127.0.0.1:8774/v2/tenant_id'
}],
'endpoints_links': [],
'type': 'compute',
'name': u'nova'}
}],
'timestamp': '2015-08-20 10:50:14.930752',
'_context_user': '329d6d053f83464a9f02e1cc9c560371',
'_context_auth_token': '1d05041c7b434f9c0ea95dccc',
'_context_user_id': '329d6d053f83464a9f02e1cc9c560371',  
'payload': {},
'_context_read_deleted': 'no',
'_context_tenant': '079db7f7184c49ac94a5fb05a994904a',
'_context_is_admin': False,
'_context_project_id': '079db7f7184c49ac94a5fb05a994904a',
'_context_timestamp':  '2015-08-20T10:50:14.385672',
'publisher_id': 'aabbcc.ubuntu',
'message_id': '581f2ad8-40d7-468b-a692-733127ded3d4',
'_context_remote_address': u'127.0.0.1'
}
  消息的主图内容存放在payload中,一般从payload中可以取到消息对应的资源的一些信息。
ceilometer notification plugin
  在ceilometer/agent/plugin_base.py中定义了Notification plugin的基础类,代码如下:


@six.add_metaclass(abc.ABCMeta)
class NotificationBase(PluginBase):
"""Base class for plugins that support the notification API."""
def __init__(self, transporter):
super(NotificationBase, self).__init__()
# NOTE(gordc): this is filter rule used by oslo.messaging to dispatch
# messages to an endpoint.
self.filter_rule = oslo_messaging.NotificationFilter(
event_type='|'.join(self.event_types))
self.transporter = transporter
# NOTE(gordc): if no publisher, this isn't a PipelineManager and
# data should be requeued.
self.requeue = False if hasattr(transporter, 'publisher') else True
@abc.abstractproperty
def event_types(self):
"""Return a sequence of strings.
Strings are defining the event types to be given to this plugin.
"""
def get_targets(self, conf):
"""Return a sequence of oslo.messaging.Target.
Sequence is defining the exchange and topics to be connected for this
plugin.
:param conf: Configuration.
"""
# TODO(sileht): Backwards compatibility, remove in J+2
if hasattr(self, 'get_exchange_topics'):
LOG.warn(_('get_exchange_topics API of NotificationPlugin is'
'deprecated, implements get_targets instead.'))
targets = []
for exchange, topics in self.get_exchange_topics(conf):
targets.extend(oslo_messaging.Target(topic=topic,
exchange=exchange)
for topic in topics)
return targets
@abc.abstractmethod
def process_notification(self, message):
"""Return a sequence of Counter instances for the given message.
:param message: Message to process.
"""
def info(self, ctxt, publisher_id, event_type, payload, metadata):
"""RPC endpoint for notification messages
When another service sends a notification over the message
bus, this method receives it.
:param ctxt: oslo.messaging context
:param publisher_id: publisher of the notification
:param event_type: type of notification
:param payload: notification payload
:param metadata: metadata about the notification
"""
notification = messaging.convert_to_old_notification_format(
'info', ctxt, publisher_id, event_type, payload, metadata)
self.to_samples_and_publish(context.get_admin_context(), notification)
def to_samples_and_publish(self, context, notification):
"""Return samples produced by *process_notification*.
Samples produced for the given notification.
:param context: Execution context from the service or RPC call
:param notification: The notification to process.
"""
if self.requeue:
meters = [
utils.meter_message_from_counter(
sample, cfg.CONF.publisher.telemetry_secret)
for sample in self.process_notification(notification)
]
for notifier in self.transporter:
notifier.sample(context.to_dict(),
event_type='ceilometer.pipeline',
payload=meters)
else:
with self.transporter.publisher(context) as p:
p(list(self.process_notification(notification)))
  基本上,只需要继承这个类,然后实现event_types、process_notification以及get_targets方法即可。event_types定义了过滤的事件类型,通过fnmatch匹配;

process_notification即为将notification处理成sample的格式;get_targets设置连接消息队列;

下面以compue为例介绍,ceilometer/compute/notifications/__init__py中定义如下:


class ComputeNotificationBase(plugin_base.NotificationBase):
@staticmethod
def get_targets(conf):
"""Return a sequence of oslo.messaging.Target
This sequence is defining the exchange and topics to be connected for
this plugin.
"""
return [oslo.messaging.Target(topic=topic,
exchange=conf.nova_control_exchange)
for topic in conf.notification_topics]
  此处实现了get_targets方法,即设置nova中定义的的exchange并监听相关的设置队列。ceilometer/compute/notifications/instance.py中定义了具体的event_types、process_notification,代码实现如下


@six.add_metaclass(abc.ABCMeta)
class UserMetadataAwareInstanceNotificationBase(
notifications.ComputeNotificationBase):
"""Consumes notifications containing instance user metadata."""
def process_notification(self, message):
instance_properties = self.get_instance_properties(message)
if isinstance(instance_properties.get('metadata'), dict):
src_metadata = instance_properties['metadata']
del instance_properties['metadata']
util.add_reserved_user_metadata(src_metadata, instance_properties)
return self.get_sample(message)
def get_instance_properties(self, message):
"""Retrieve instance properties from notification payload."""
return message['payload']
@abc.abstractmethod
def get_sample(self, message):
"""Derive sample from notification payload."""

class InstanceScheduled(UserMetadataAwareInstanceNotificationBase,
plugin_base.NonMetricNotificationBase):
event_types = ['scheduler.run_instance.scheduled']
def get_instance_properties(self, message):
"""Retrieve instance properties from notification payload."""
return message['payload']['request_spec']['instance_properties']
def get_sample(self, message):
yield sample.Sample.from_notification(
name='instance.scheduled',
type=sample.TYPE_DELTA,
volume=1,
unit='instance',
user_id=None,
project_id=message['payload']['request_spec']
['instance_properties']['project_id'],
resource_id=message['payload']['instance_id'],
message=message)

class ComputeInstanceNotificationBase(
UserMetadataAwareInstanceNotificationBase):
"""Convert compute.instance.* notifications into Samples."""
event_types = ['compute.instance.*']
  除了要获取从消息队列中获取Notification存储外,还会通过event_types过滤出部分的事件,通过process_notification实现Notification->Smaple的转换。
ceilometer traits处理
  在获取了notification之后,怎么处理这些event事件呢?ceilometer中提供了一个event_definations.yaml配置文件,在这个配置文件中,可以将Notification转换成按照event_definations.yaml中定义的格式,以compute事件为例,如下:


---
- event_type: compute.instance.*
traits: &instance_traits
tenant_id:
fields: payload.tenant_id
user_id:
fields: payload.user_id
instance_id:
fields: payload.instance_id
host:
fields: publisher_id
plugin:
name: split
parameters:
segment: 1
max_split: 1
service:
fields: publisher_id
plugin: split
memory_mb:
type: int
fields: payload.memory_mb
disk_gb:
type: int
fields: payload.disk_gb
root_gb:
type: int
fields: payload.root_gb
ephemeral_gb:
type: int
fields: payload.ephemeral_gb
vcpus:
type: int
fields: payload.vcpus
instance_type_id:
type: int
fields: payload.instance_type_id
instance_type:
fields: payload.instance_type
state:
fields: payload.state
os_architecture:
fields: payload.image_meta.'org.openstack__1__architecture'
os_version:
fields: payload.image_meta.'org.openstack__1__os_version'
os_distro:
fields: payload.image_meta.'org.openstack__1__os_distro'
launched_at:
type: datetime
fields: payload.launched_at
deleted_at:
type: datetime
fields: payload.deleted_at
- event_type: compute.instance.exists
traits:
<<: *instance_traits
audit_period_beginning:
type: datetime
fields: payload.audit_period_beginning
audit_period_ending:
type: datetime
fields: payload.audit_period_ending
  event_type定义了符合匹配条件的event事件,traits定义从Notification中转换的数据。其中&resource_traits定义基础的traits项,<<: *resource_traits定义的是附加的traits项。

每个trait项由type、fields、plugin组成。type为可选项,默认是str,有datetime,int等可选;fields项,为从Notification获取的值,可以是数组;plugin项为调用已定义好的traits处理函数;
ceilometer Event格式及存储
  将Notification根据event_definations.yaml处理完成后,生成的数据被定义成Event,格式如下:


{
message_id: “XXX”,
event_type: “compute.instance.create.end”,
timestamp: “yyyy-MM-dd mm-hh-ss”,
traits: [{key: ‘key’, value: ‘value’, type: ‘type’}……]
}

Event查询
  Ceilometer提供了Event事件查询的API接口,可以根据start_timestamp/end_timestamp,event_type/message_id查询Event事件。

如:


curl -X GET -H 'X-Auth-Token: ******' http://127.0.0.1:8774/v2/events?q.field=event_type&q.field=compute.instance.create.end
  但实时上,event的这种存储方式比较的难用,如果想查询某个项目下的所有compute相关事件,实时上是非常难查询的,所以本人在实际开发中对event事件做了一些变动,以满足实际中按项目查询的需求。

版权声明:本文为博主原创文章,未经博主允许不得转载。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-125247-1-1.html 上篇帖子: RDO 安装 OpenStack 下篇帖子: Openstack Zoning – Region/Availability Zone/Host Aggregate
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表