if self.meter_type == 'metering':
meters = [
meter_message_from_counter_beta(
counter,
self.topic_secret[self.meter_type],
source)
for counter in counters if type(counter) == sample.Sample
]
else:
meters = [
meter_message_from_counter_beta(
counter,
self.topic_secret[self.meter_type],
source)
for counter in counters
]
topic = self.meter_type 三、接收数据collector模块
1、ceilometer/ceilometer/collector/service.py文件initialize_service_hook中定义worker用于接收rpc发送过来的数据。详细配置例如以下:
for k, v in cfg.CONF.publisher_rpc.iteritems():
if k.endswith('topic'):
self.conn.create_worker(
v,
rpc_dispatcher.RpcDispatcher([self]),
'ceilometer.collector.' + v,
)
2、定义对应的target用于接收採集的数据,例如以下所看到的:
def record_server_data(self, context, data):
for dispatcher in self.dispatchers:
dispatcher.record_data(context, data, 'server')
当中。最后一个參数指定本次发送过来的topic类型。在调用dispatch.record_data方法时。用于调用对应的存储数据的方法,进而使得数据持久化到不同的数据库表中。
def record_data(self, context, data, meter_type):
if not isinstance(data, list):
data = [data]
for meter in data:
if self.secret_method[meter_type] and
publisher_rpc.verify_signature(meter,
self.secret_method[meter_type][0]):
try:
if meter.get('timestamp'):
meter['timestamp'] =
self.time_to_date(meter['timestamp'])
method = getattr(self.storage_conn.__class__,
self.secret_method[meter_type][1])
method(self.storage_conn, meter)
except Exception as err:
LOG.error('Failed to record metering data: %s', err)
LOG.exception(err)
else:
LOG.warning(
'message signature invalid, discarding message: %r',
meter) 四、持久化存储模块
1、因为新增宿主机数据模块的监測,需增一张数据库表,所以storage模块也需做对应的改动。
首先,须要改动ceilometer/ceilometer/storage/base.py文件,添加接口: