设为首页 收藏本站
查看: 1135|回复: 0

[经验分享] python抓取系统metrics吐给kafka

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2016-6-27 10:14:19 | 显示全部楼层 |阅读模式
本篇介绍用python写脚本,抓取系统metrics,然后调用kafka client library把metrics吐给kafka的案例分享。对于用kafka的同学实用性很高。
在运行本实例前需要先下载两个python库到本地 : six和kafka-python

cat config_system_metrics.json
{
   "env": {
    "site": "cluster",
    "component": "namenode",
    "metric_prefix": "system"
   },
   "output": {
     "kafka": {
       "topic": "system_metrics_cluster",
       "brokerList": ["10.10.10.1:9092", "10.10.10.2:9092", "10.10.10.3:9092"]
     }
   }
}

cat system_metrics.python

#!/usr/bin/env python
import sys
import os
import json
import socket
import re
import time
import logging
import threading

# load kafka-python
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)),'','lib/six'))
import six

# load kafka-python
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)),'','lib/kafka-python'))
from kafka import KafkaClient, SimpleProducer, SimpleConsumer

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(name)-12s %(levelname)-6s %(message)s',
                    datefmt='%m-%d %H:%M')


CONFIG_FILE = 'config_system_metrics.json'


class LoadConfig(object):
        def __init__(self):
                config_file = "./" + CONFIG_FILE

                try:
                        f = open(config_file, 'r')
                except Exception, e:
                        print "Load config file %s Error !" % config_file
                        sys.exit(1)
                try:
                        config_json = json.loads(f.read())
                except Exception, e:
                        print "Convert config file to Json format Error !"
                        sys.exit(1)

                if f:
                        f.close()
                self.config = config_json


class Kafka(LoadConfig):

        def __init__(self):
                LoadConfig.__init__(self)
                self.broker = self.config["output"]["kafka"]["brokerList"]


        def kafka_connect(self):
                #print "Connecting to kafka "+str(self.broker)
                # To send messages synchronously
                kc = KafkaClient(self.broker,timeout = 30)
                producer = SimpleProducer(kc,async=False,batch_send=True)
                return kc, producer


        def kafka_produce(self, producer, topic, kafka_json):

                # ************  Sample of kafka_json ********************

                # {'timestamp': 1463710, 'host': 'xxx', 'metric': 'system.nic.receivedbytes', 'value': '4739', 'component': 'namenode', 'site': 'apolloqa'}

                # ******************************************************

                # Note that the application is responsible for encoding messages to type str
                producer.send_messages(topic, kafka_json)

class Metric(LoadConfig):
        def __init__(self):
                LoadConfig.__init__(self)
                try :
                        self.fqdn = socket.getfqdn()
                except Exception , e:
                        print "Could not get hostname ! %s " %e
                        sys.exit(1)
                self.data = []
                self.datapoint = {}
                self.datapoint["timestamp"] = int(round(time.time() * 1000))
                self.datapoint["host"] = self.fqdn
                self.datapoint["component"] = self.config['env']['component']
                self.datapoint["site"] = self.config['env']['site']


class Metric_Uptime(Metric):
        def __init__(self):
                Metric.__init__(self)
                self.demensions = ["uptime.day", "idletime.day"]
                self.result = os.popen('cat /proc/uptime').readlines()
                self.data = []

        def metric_collect(self):
                for line in self.result:
                        values = re.split("\s+", line.rstrip())
                        for i in range(len(self.demensions)):
                                self.datapoint["metric"] = self.config['env']['metric_prefix'] + "." + 'uptime' + '.' + self.demensions[i]
                                self.datapoint["value"] = str(round(float(values[i]) / 86400 , 2))
                                self.data.append(self.datapoint.copy())
                return self.data

class Metric_Loadavg(Metric):
        def __init__(self):
                Metric.__init__(self)
                self.demensions = ['cpu.loadavg.1min', 'cpu.loadavg.5min', 'cpu.loadavg.15min']
                self.result = os.popen('cat /proc/loadavg').readlines()
                self.data = []

        def metric_collect(self):
                for line in self.result:
                        values = re.split("\s+", line.strip())
                        for i in range(len(self.demensions)):
                                self.datapoint["metric"] = self.config['env']['metric_prefix'] + "." + 'loadavg' + '.' + self.demensions[i]
                                self.datapoint["value"] = values[i]
                                self.data.append(self.datapoint.copy())
                return self.data

class Metric_Memory(Metric):
        def __init__(self):
                Metric.__init__(self)
                self.result = os.popen('cat /proc/meminfo').readlines()
                self.data = []

        def metric_collect(self):
                for line in self.result:
                        demensions = re.split(":?\s+", line.rstrip())
                        self.datapoint["metric"] = self.config['env']['metric_prefix'] + "." + 'memory' + '.' + demensions[0] + '.kB'
                        self.datapoint["value"] = demensions[1]
                        self.data.append(self.datapoint.copy())
                return self.data

class Metric_CpuTemp(Metric):
        def __init__(self):
                Metric.__init__(self)
                self.result = os.popen('sudo ipmitool sdr | grep Temp | grep CPU').readlines()
                self.data = []

        def metric_collect(self):
                for line in self.result:
                        demensions = re.split("\|", line.strip())
                        self.datapoint["metric"] = self.config['env']['metric_prefix'] + "." + re.split(" ", demensions[0])[0] + '.Temp'
                        self.datapoint["value"] = re.split(" ", demensions[1])[1]
                        self.data.append(self.datapoint.copy())
                print self.data
                return self.data

class Metric_Net(Metric):
        def __init__(self):
                Metric.__init__(self)
                self.demensions = ['receivedbytes', 'receivedpackets', 'receivederrs', 'receiveddrop', 'transmitbytes', 'transmitpackets',
                 'transmiterrs', 'transmitdrop']
                self.result = os.popen("cat /proc/net/dev").readlines()
                self.data = []

        def metric_collect(self):
                for line in self.result:
                        if re.match('^(Inter|\s+face|\s+lo)', line) :
                                continue
                        interface = re.split(':?\s+', line)[1]
                        values = re.split(':?\s+', line)[2:6] + re.split(':?\s+', line)[9:13]
                        for i in range(len(self.demensions)):
                                self.datapoint["metric"] = self.config['env']['metric_prefix'] + "." + interface + "." + self.demensions[i]
                                self.datapoint["value"] = values[i]
                                self.data.append(self.datapoint.copy())
                print self.data
                return self.data


class Collect(LoadConfig):
        def __init__(self):
                LoadConfig.__init__(self)
                self.topic = self.config['output']['kafka']['topic']

        def connect(self):
                self.kafkaclient = Kafka()
                self.kc, self.producer = self.kafkaclient.kafka_connect()
                return self.kc, self.producer

        def send(self, kafka_producer, msg):
                self.kafkaclient.kafka_produce(kafka_producer, self.topic, json.dumps(msg))

        def close(self, kafka_producer, kafka_client):
                if kafka_producer is not None:
                        kafka_producer.stop()
                if kafka_client is not None:
                        kafka_client.close()

        def run(self, kafka_client, kafka_producer, metric):
                data = metric.metric_collect()
                #print data
                self.send(kafka_producer, data)
                self.close(kafka_producer, kafka_client)

collector = Collect()
metrics = [Metric_Uptime(), Metric_Loadavg(), Metric_Memory(), Metric_CpuTemp(), Metric_Net()]

# Establish Tcp connection once forever, share the same Tcp conncetion
kafka_client, kafka_producer = collector.connect()
for metric in metrics:
        t = threading.Thread(target = collector.run , args = (kafka_client, kafka_producer, metric) )
        t.start()



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-235867-1-1.html 上篇帖子: storm+kafka集成简单应用 下篇帖子: kafka 基本原理和概念 python
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表