joozh 发表于 2018-1-3 11:46:03

Ansible 多机文件分发、执行脚本并单机合并实验结果(Check point, 多线程异步执行,主机状态检测等)

#!/usr/bin/env python  
# -*- coding:utf-8 -*-
  

  
import sys, os
  
import ConfigParser
  
import ansible.runner
  
import time
  

  
inventory_file = "../host.ini"
  
database_path = "../database"
  
config_file = "../config.ini"
  


  
>  
   def __init__(self):
  
         cf = ConfigParser.ConfigParser()
  
   cf.read(config_file)
  
   self.master_load = int(cf.get("ansible", "master_load"))
  
         self.slave_load = int(cf.get("ansible", "slave_load"))
  

  
   self.code_home_path = cf.get("ansible", "code_home")
  
   self.ansibleResult = self.code_home_path + "/ansibleResult"
  
   self.database_path = self.code_home_path + "/database"
  

  

  
   self.avail_master = []
  
   self.avail_slave = []
  
   self.loadBalancing = {}
  

  
   def extractHost(self,config_file_path):
  
         cf = ConfigParser.ConfigParser(allow_no_value = True)
  
         cf.read(config_file_path)
  
         s = cf.sections()
  
         #print 'section:', s
  
         ms = cf.options("master")
  
         #print 'options:', ms
  
   for x in ms:
  
         runner = ansible.runner.Runner(
  
         host_list=inventory_file,
  
             module_name='ping',
  
         module_args='',
  
         pattern=x
  
         )
  
         res = runner.run()
  
         #print res
  
         if res['dark'] and res['dark'] and res['dark']['failed']:
  
         print "Host Error:", x, "Unreachable"
  
             continue
  
         self.avail_master.append(x)
  
   self.avail_master = list(set(self.avail_master))
  
   print self.avail_master
  
   ss = cf.options("slave")
  
   for x in ss:
  
         runner = ansible.runner.Runner(
  
             host_list=inventory_file,
  
             module_name='ping',
  
         module_args='',
  
         pattern=x
  
         )
  
         res = runner.run()
  
         #print res
  
         if res['dark'] and res['dark'] and res['dark']['failed']:
  
             continue
  
         self.avail_slave.append(x)
  
   self.avail_slave = list(set(self.avail_slave))
  

  
   # make true the host is unique
  
   for host in self.avail_master:
  
         if host in self.avail_slave:
  
             self.avail_slave.remove(host)
  
   print self.avail_slave
  

  
   def calculateLoadBalancing(self):
  
         load_count = int(os.popen("ls %s | grep ^mygraph | wc -l" % database_path).read())
  
   self.load_count = load_count
  
   current_load_count = load_count
  
   print load_count
  
   print self.master_load, self.slave_load
  
   master_all_count = len(self.avail_master)
  
   slave_all_count = len(self.avail_slave)
  

  
   for m in self.avail_master:
  
         temp_count = min(current_load_count, int(load_count * (self.master_load / float(self.master_load * master_all_count + self.slave_load * slave_all_count))))
  
         print temp_count
  
         if temp_count > 0:
  
               self.loadBalancing = temp_count
  
         current_load_count -= temp_count
  
         for s in self.avail_slave:
  
         temp_count = min(current_load_count, int(load_count * (self.slave_load / float(self.master_load * master_all_count + self.slave_load * slave_all_count))))
  
         if temp_count > 0:
  
             self.loadBalancing = temp_count
  
         current_load_count -= temp_count
  

  
   if current_load_count > 0 and master_all_count > 0:
  
         self.loadBalancing] += current_load_count
  
         current_load_count = 0
  
   if current_load_count > 0 and slave_all_count > 0:
  
         self.loadBalancing] += current_load_count
  
         current_load_count = 0
  
   print self.loadBalancing
  

  
   def fileTransfer(self):
  
         print self.code_home_path
  
   print self.ansibleResult
  
   if self.ansibleResult == "":
  
         print "ERROR: the home of the result about ansible is null!"
  
         sys.exit(1)
  
   current_database_count = 0
  
   #clean the directory of the result
  
   for host in self.loadBalancing:
  
         print host
  
         runner = ansible.runner.Runner(
  
             host_list=inventory_file,
  
         module_name='file',
  
         module_args='path=%s state=absent' % self.ansibleResult,
  
         pattern=host
  
         )
  
         res = runner.run()
  
         #print res
  
         runner = ansible.runner.Runner(
  
             host_list=inventory_file,
  
         module_name='file',
  
         module_args='path=%s state=directory mode=0750' % self.ansibleResult,
  
         pattern=host
  
         )
  
         res = runner.run()
  
         #print res
  

  
         # transfer the database
  
             this_host_load_count = self.loadBalancing
  
         for i in range(this_host_load_count) :
  
             runner = ansible.runner.Runner(
  
             host_list=inventory_file,
  
             module_name='copy',
  
             module_args='src=%s/mygraph%s dest=%s mode=0750' % (self.database_path, current_database_count + i, self.ansibleResult),
  
             pattern=host
  
         )
  
         res = runner.run()
  
         print res
  
         #print current_database_count
  
         #print this_host_load_count
  
         current_database_count += this_host_load_count
  

  
         # transfer the code
  
         file_code_list = ["community.jar", "config.ini", "runAll.sh","lib"]
  
         for f in file_code_list:
  
             runner = ansible.runner.Runner(
  
             host_list=inventory_file,
  
             module_name='copy',
  
             module_args='src=%s/%s dest=%s mode=770' % (self.code_home_path, f, self.code_home_path),
  
             pattern=host
  
         )
  
         res = runner.run()
  
         print res
  

  
         # run the code distributed
  
         runner = ansible.runner.Runner(
  
             host_list=inventory_file,
  
         module_name="shell",
  
         module_args='chdir=%s nohup bash runAll.sh %s > bash.log &' % (self.code_home_path, self.ansibleResult),
  
         pattern=host
  
         )
  

  
         res = runner.run()
  
         print res
  
         print "The Host named", host, "is running!"
  

  
   def informationTransfer(self, transfer_module_name, transfer_module_args, host):
  
         print transfer_module_args
  
   print transfer_module_name
  
   runner = ansible.runner.Runner(
  
         host_list=inventory_file,
  
         module_name=transfer_module_name,
  
         module_args=transfer_module_args,
  
         pattern=host
  
   )
  
   res = runner.run()
  
   print res
  

  
   def checkPoint(self):
  
         os.system("rm -rf %s/output" % self.database_path)
  
         os.system("mkdir -p %s/output" % self.database_path)
  
         print "Begining CheckPoint"
  
   host_keys = self.loadBalancing.keys()
  
   #print host_keys
  
   while len(host_keys) > 0:
  
         for host in host_keys:
  
         runner = ansible.runner.Runner(
  
             host_list=inventory_file,
  
             module_name='command',
  
             module_args='removes=%s/success_log pwd' % self.ansibleResult,
  
             pattern=host
  
         )
  
         res = runner.run()
  
         print res
  
         if res['contacted'] and res['contacted'] and "skipped" in res['contacted']['stdout']:
  
             continue
  
         else:
  
             print "Host", host, "finished his tasks!"
  
             host_keys.remove(host)
  
             self.informationTransfer("synchronize", "src=%s/output dest=%s mode=pull" % (self.ansibleResult, self.database_path), host)
  
             print host_keys
  
             print "to sleep"
  
             time.sleep(5)
  

  

  
if __name__ == "__main__":
  
   ob = DistributedByAnsible()
  
   ob.extractHost(inventory_file)
  
   #ob.calculateLoadBalancing();
  
   #ob.fileTransfer();
  
   #ob.checkPoint();
  
   print "Done"
  

  
页: [1]
查看完整版本: Ansible 多机文件分发、执行脚本并单机合并实验结果(Check point, 多线程异步执行,主机状态检测等)