|
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import sys, os
import ConfigParser
import ansible.runner
import time
inventory_file = "../host.ini"
database_path = "../database"
config_file = "../config.ini"
>
def __init__(self):
cf = ConfigParser.ConfigParser()
cf.read(config_file)
self.master_load = int(cf.get("ansible", "master_load"))
self.slave_load = int(cf.get("ansible", "slave_load"))
self.code_home_path = cf.get("ansible", "code_home")
self.ansibleResult = self.code_home_path + "/ansibleResult"
self.database_path = self.code_home_path + "/database"
self.avail_master = []
self.avail_slave = []
self.loadBalancing = {}
def extractHost(self,config_file_path):
cf = ConfigParser.ConfigParser(allow_no_value = True)
cf.read(config_file_path)
s = cf.sections()
#print 'section:', s
ms = cf.options("master")
#print 'options:', ms
for x in ms:
runner = ansible.runner.Runner(
host_list=inventory_file,
module_name='ping',
module_args='',
pattern=x
)
res = runner.run()
#print res
if res['dark'] and res['dark'][x] and res['dark'][x]['failed']:
print "Host Error:", x, "Unreachable"
continue
self.avail_master.append(x)
self.avail_master = list(set(self.avail_master))
print self.avail_master
ss = cf.options("slave")
for x in ss:
runner = ansible.runner.Runner(
host_list=inventory_file,
module_name='ping',
module_args='',
pattern=x
)
res = runner.run()
#print res
if res['dark'] and res['dark'][x] and res['dark'][x]['failed']:
continue
self.avail_slave.append(x)
self.avail_slave = list(set(self.avail_slave))
# make true the host is unique
for host in self.avail_master:
if host in self.avail_slave:
self.avail_slave.remove(host)
print self.avail_slave
def calculateLoadBalancing(self):
load_count = int(os.popen("ls %s | grep ^mygraph | wc -l" % database_path).read())
self.load_count = load_count
current_load_count = load_count
print load_count
print self.master_load, self.slave_load
master_all_count = len(self.avail_master)
slave_all_count = len(self.avail_slave)
for m in self.avail_master:
temp_count = min(current_load_count, int(load_count * (self.master_load / float(self.master_load * master_all_count + self.slave_load * slave_all_count))))
print temp_count
if temp_count > 0:
self.loadBalancing[m] = temp_count
current_load_count -= temp_count
for s in self.avail_slave:
temp_count = min(current_load_count, int(load_count * (self.slave_load / float(self.master_load * master_all_count + self.slave_load * slave_all_count))))
if temp_count > 0:
self.loadBalancing = temp_count
current_load_count -= temp_count
if current_load_count > 0 and master_all_count > 0:
self.loadBalancing[self.avail_master[0]] += current_load_count
current_load_count = 0
if current_load_count > 0 and slave_all_count > 0:
self.loadBalancing[self.avail_slave[0]] += current_load_count
current_load_count = 0
print self.loadBalancing
def fileTransfer(self):
print self.code_home_path
print self.ansibleResult
if self.ansibleResult == "":
print "ERROR: the home of the result about ansible is null!"
sys.exit(1)
current_database_count = 0
#clean the directory of the result
for host in self.loadBalancing:
print host
runner = ansible.runner.Runner(
host_list=inventory_file,
module_name='file',
module_args='path=%s state=absent' % self.ansibleResult,
pattern=host
)
res = runner.run()
#print res
runner = ansible.runner.Runner(
host_list=inventory_file,
module_name='file',
module_args='path=%s state=directory mode=0750' % self.ansibleResult,
pattern=host
)
res = runner.run()
#print res
# transfer the database
this_host_load_count = self.loadBalancing[host]
for i in range(this_host_load_count) :
runner = ansible.runner.Runner(
host_list=inventory_file,
module_name='copy',
module_args='src=%s/mygraph%s dest=%s mode=0750' % (self.database_path, current_database_count + i, self.ansibleResult),
pattern=host
)
res = runner.run()
print res
#print current_database_count
#print this_host_load_count
current_database_count += this_host_load_count
# transfer the code
file_code_list = ["community.jar", "config.ini", "runAll.sh","lib"]
for f in file_code_list:
runner = ansible.runner.Runner(
host_list=inventory_file,
module_name='copy',
module_args='src=%s/%s dest=%s mode=770' % (self.code_home_path, f, self.code_home_path),
pattern=host
)
res = runner.run()
print res
# run the code distributed
runner = ansible.runner.Runner(
host_list=inventory_file,
module_name="shell",
module_args='chdir=%s nohup bash runAll.sh %s > bash.log &' % (self.code_home_path, self.ansibleResult),
pattern=host
)
res = runner.run()
print res
print "The Host named", host, "is running!"
def informationTransfer(self, transfer_module_name, transfer_module_args, host):
print transfer_module_args
print transfer_module_name
runner = ansible.runner.Runner(
host_list=inventory_file,
module_name=transfer_module_name,
module_args=transfer_module_args,
pattern=host
)
res = runner.run()
print res
def checkPoint(self):
os.system("rm -rf %s/output" % self.database_path)
os.system("mkdir -p %s/output" % self.database_path)
print "Begining CheckPoint"
host_keys = self.loadBalancing.keys()
#print host_keys
while len(host_keys) > 0:
for host in host_keys:
runner = ansible.runner.Runner(
host_list=inventory_file,
module_name='command',
module_args='removes=%s/success_log pwd' % self.ansibleResult,
pattern=host
)
res = runner.run()
print res
if res['contacted'] and res['contacted'][host] and "skipped" in res['contacted'][host]['stdout']:
continue
else:
print "Host", host, "finished his tasks!"
host_keys.remove(host)
self.informationTransfer("synchronize", "src=%s/output dest=%s mode=pull" % (self.ansibleResult, self.database_path), host)
print host_keys
print "to sleep"
time.sleep(5)
if __name__ == "__main__":
ob = DistributedByAnsible()
ob.extractHost(inventory_file)
#ob.calculateLoadBalancing();
#ob.fileTransfer();
#ob.checkPoint();
print "Done"
|
|