q6542125 发表于 2015-12-15 08:45:55

用python+django+twistd 开发一个属于自己的运维系统

开源的运维系统不少,比如nagios、zabbix、cati等等,但是遇到自己个性化的运维需求的时候,总是显的力不从心!最近在学习python,所以就考虑用python+django+twisted来定做一个完全个性化的运维系统。
运维系统有几个主要的功能:监控、分析、报警、更甚者直接根据分析的结果进行反应操作。而以上几点通过上述的框架可以比较容易的实现。
下面上图说明:



使用freemind整理了下思路:





下面是一些代码段,完整的代码下载见文档底部:
Server:




[*]#!/usr/bin/env python

[*]#coding:utf-8
[*]__author__ = 'dwj'
[*]
[*]
[*]from twisted.internet.protocol import ServerFactory
[*]from twisted.protocols import basic
[*]import cx_Oracle
[*]from twisted.application import service, internet
[*]
[*]
[*]
[*]class Mornitor_Protocol(basic.LineReceiver):
[*]
[*]    def __init__(self):
[*]    #
[*]      _oracle_conn=cx_Oracle.connect('xxxx', 'xxxx', '192.168.7.17/test', threaded=True)
[*]      _oracle_conn.autocommit = True
[*]      self.cur = _oracle_conn.cursor()
[*]      self._oracle_conn=_oracle_conn
[*]
[*]
[*]    def ruku(self, line):
[*]      ip=self.transport.getPeer().host
[*]      #获取客户端IP
[*]      line=line.split(':::')
[*]      #使用:::分割原始数据
[*]      if line[1] in ['cpu', 'mem', 'disk', 'tcp', 'net', 'process_down']:
[*]      #根据数据包头来确定使用insert还是update,当是tcp包头的时候插入,其余的更新
[*]            if line[1] == 'tcp':
[*]                sql = "insert into MORNITOR_BASICINFO (ipadd,time,tcp) values (\'%s\',\'%s\',\'%s\')"%(ip,line[0],line[3])
[*]                print sql
[*]                self.cur.execute(sql)
[*]
[*]            else:
[*]                line_again = line[3].split('::')
[*]                sql = 'update MORNITOR_BASICINFO set %s=\'%s\',%s=\'%s\' where ipadd=\'%s\' and time=\'%s\''%(line[1],line_again[0],line[2],line_again[1],ip,line[0])
[*]                print sql
[*]                self.cur.execute(sql)
[*]
[*]    def connectionMade(self):
[*]      print 'Connected!'
[*]
[*]    def lineReceived(self, line):
[*]      print line
[*]      self.ruku(line)
[*]      #接受到数据之后执行入库操作!
[*]    def connectionLost(self, reason='connectionDone'):
[*]      self._oracle_conn.close()
[*]      print 'The db is close... ok!'
[*]
[*]
[*]class Mornitor_Factory(ServerFactory):
[*]    #还没想好要初始化什么
[*]    def __init__(self,service):
[*]      self.service = service
[*]
[*]    protocol = Mornitor_Protocol
[*]
[*]
[*]class Fish_Service(service.Service):
[*]
[*]    def __init__(self):
[*]      pass
[*]
[*]    def startService(self):
[*]      service.Service.startService(self) #什么都不做,开始服务
[*]
[*]    # def stopService(self):
[*]    # return self._port.stopListening()
[*]
[*]
[*]
[*]#配置参数
[*]port = 10000
[*]iface = '127.0.0.1'
[*]
[*]
[*]
[*]top_server = service.MultiService() #定义服务容器
[*]
[*]fish_server = Fish_Service() #实例化我们的服务
[*]fish_server.setServiceParent(top_server) #把自定义的服务加入到服务容器
[*]
[*]factory = Mornitor_Factory(Fish_Service) #工厂化服务
[*]
[*]tcp_server = internet.TCPServer(port, factory, interface=iface) #定义tcp服务
[*]tcp_server.setServiceParent(top_server) #把tcp服务加入到服务容器
[*]
[*]application = service.Application('Fish_Service') #给应用起个名字
[*]top_server.setServiceParent(application) #把服务容器丢到应用中去
Client端




[*]from twisted.protocols import basic

[*]from twisted.internet import protocol, defer, task
[*]import Get_basic_info_2 as Huoqu
[*]import guardian as shouhu
[*]import time
[*]from twisted.application import service, internet
[*]
[*]
[*]class Monitor_Protocol(basic.LineReceiver):
[*]    #自定义客户端和服务端的连接协议,从basic的line继承
[*]
[*]    def __init__(self):
[*]      #
[*]      pass
[*]
[*]    @staticmethod
[*]    def huoqu_shuju():
[*]      #定义一个函数获取本机的一些状态
[*]      now = str(time.strftime('%Y-%m-%d %H:%M:%S'))
[*]
[*]      def add_tag(source, tag1, tag2 = 'none'):
[*]      #定义格式化字符串函数
[*]            return ':::'.join([now, tag1, tag2, source])
[*]            #使用:::分隔时间、简单信息、详细信息、原始信息
[*]
[*]      tcp = add_tag(Huoqu.net_tcp(), 'tcp')
[*]      cpu = add_tag(Huoqu.cpu(), 'cpu', 'cpu_detail')
[*]      mem = add_tag(Huoqu.mem(), 'mem', 'mem_detail')
[*]      disk = add_tag(Huoqu.disk_usage(), 'disk', 'disk_detail')
[*]      net = add_tag(Huoqu.net_rate(), 'net', 'net_detail')
[*]      process = add_tag(shouhu.check_alive(), 'process_down', 'process_alived')
[*]      result = (tcp, cpu, mem, disk, net, process, )
[*]      d = defer.Deferred()
[*]      #使用defered返回结果
[*]      d.callback(result)
[*]      return d
[*]
[*]    def xunhuan(self, list):
[*]    #定义循环发送函数
[*]      for i in list:
[*]            self.sendLine(i)
[*]
[*]    def fasong(self):
[*]    #定义程序运行顺序,取得信息后用callback交给发送函数发送
[*]      self.huoqu_shuju().addCallback(self.xunhuan)
[*]
[*]    def loop(self):
[*]    #使用twist内置的循环函数定义几秒监控数据传送到服务端
[*]      l = task.LoopingCall(self.fasong)
[*]      l.start(1)
[*]
[*]    def connectionMade(self):
[*]    #覆盖协议的connectmade函数,定义于服务端的连接建立后开始循环
[*]      print 'Connected!......ok!'
[*]      self.loop()
[*]
[*]    def lineReceived(self, line):
[*]    #必须覆盖接受函数,否则twist会报not importent错误!
[*]      pass
[*]
[*]
[*]class Moinitor_client_factory(protocol.ReconnectingClientFactory):
[*]
[*]    def __init__(self, service):
[*]    #还没想要要写什么
[*]      self.service = service
[*]    protocol = Monitor_Protocol
[*]
[*]
[*]class Client_Service(service.Service):
[*]
[*]    def __init__(self):
[*]      pass
[*]
[*]    def startService(self):
[*]      service.Service.startService(self)
[*]
[*]
[*]#配置文件开始
[*]port = 10000
[*]host = '127.0.0.1'
[*]
[*]#守护进程
[*]top_service = service.MultiService() #定义服务容器
[*]
[*]client_service = Client_Service() #实例化服务类
[*]client_service.setServiceParent(top_service) #把自己定义的服务丢到服务容器中
[*]
[*]factory = Moinitor_client_factory(client_service) #定义服务工厂化
[*]
[*]tcp_service = internet.TCPClient(host, port, factory) #定义tcp连接的服务
[*]tcp_service.setServiceParent(top_service) #把tcp服务丢到服务容器中去
[*]
[*]application = service.Application('Fish_Service') #定义应用名字
[*]top_service.setServiceParent(application) #把服务容器丢到应用中去

一些自定义监控程序是否存活的脚本:




[*]program = {'nginx': ['/opt/nginx/logs/nginx.pid', '/opt/nginx/sbin/nginx'],

[*]            'rsync-C': ['/var/run/rsyncd.pid', 'rsync --daemon'],
[*]            }
[*]
[*]
[*]def main():
[*]    for k in program:
[*]      a = get_pid(k, program[k][0])
[*]      if isinstance(a, tuple):
[*]            print '%s is not running!' % k
[*]            print 'Start the program by Horland_guardian!'
[*]            subprocess.call(program[k][1], shell=True)
[*]      else:
[*]            print 'The %s is running!' % k
[*]
[*]
[*]def check_alive():
[*]    l_lived = []
[*]    l_downed = []
[*]    for k in program:
[*]      a = get_pid(k, program[k][0])
[*]      if isinstance(a, tuple):
[*]            l_downed.append(k)
[*]      else:
[*]            l_lived.append(k)
[*]    process_alived = ' '.join(l_lived)
[*]    process_down = ' '.join(l_downed)
[*]
[*]    return '::'.join([process_down, process_alived])

django的使用目前只需要使用到admin模块就可以。
下面是一些代码段:
model




[*]class BasicInfo(models.Model):

[*]    ipadd = models.IPAddressField(verbose_name = u'IP地址')
[*]    time = models.CharField(max_length=50, verbose_name = u'时间')
[*]    cpu = models.CharField(max_length=255, blank=True, verbose_name = u'CPU%')
[*]    cpu_detail = models.CharField(max_length=255, blank=True, verbose_name = u'CPU详情')
[*]    mem = models.CharField(max_length=255, blank=True, verbose_name = u'内存%')
[*]    mem_detail = models.CharField(max_length=255, blank=True, verbose_name = u'内存详情')
[*]    disk = models.CharField(max_length=255, blank=True, verbose_name = u'磁盘%')
[*]    disk_detail = models.CharField(max_length=255, blank=True, verbose_name = u'磁盘详情')
[*]    net = models.CharField(max_length=255, blank=True, verbose_name = u'流量 bytes/s')
[*]    net_detail = models.CharField(max_length=1000, blank=True, verbose_name = u'流量详情')
[*]    tcp = models.CharField(max_length=255, blank=True, verbose_name = u'tcp连接状态')
[*]    process_down = models.CharField(max_length=255, blank=True, verbose_name = u'DOWN-进程')
[*]    process_alived = models.CharField(max_length=255, blank=True, verbose_name = u'Process_UP')
[*]
[*]    def Process_DOWN(self):
[*]      return '<span style="color: #%s;">%s</span>' % ('ff0000', self.process_down) #拓机的进程用红色标识
[*]    Process_DOWN.allow_tags = True

注册到admin




[*]class BasicInfo_admin(admin.ModelAdmin):

[*]
[*]    list_display = ('time', 'cpu', 'cpu_detail', 'mem', 'mem_detail', 'disk', 'disk_detail', 'net', 'net_detail', 'tcp', 'Process_DOWN', 'process_alived')
[*]    list_filter = ('ipadd', )
[*]admin.site.register(BasicInfo, BasicInfo_admin)

freemind整理的思路中还有一些功能没有实现,目前这个只能算个简单的demon吧,但是基本实现了监控的目的。欢迎大家给我留言!
下面上个django的admin界面截图吧!







代码下载
http://download.csdn.net/detail/qcpm1983/7611579


原文链接
页: [1]
查看完整版本: 用python+django+twistd 开发一个属于自己的运维系统