|
本帖最后由 zhuguojun6 于 2020-9-17 20:53 编辑
python脚本管理jenkins个人开发,仅供 参考,谢谢
登陆类(jk_auth.py):
[Python] 纯文本查看 复制代码 import jenkins
class auth(object):
def __init__(self, url, username, password):
self.url = url
self.username = username
self.password = password
self.server = jenkins.Jenkins(self.url, self.username, self.password)
证书管理(jk_credentials.py):
[Python] 纯文本查看 复制代码 import os
import wget
import time
from jk_auth import *
class credential(auth):
def __init__(self, url, username, password):
auth.__init__(self, url, username, password)
self.url = url
self.username = username
self.password = password
def delete_cre(self, id, folder_name):
if folder_name == '.':
if os.path.exists('jenkins-cli.jar'):
os.remove('jenkins-cli.jar')
cli_jar = wget.download(f'{self.url}/jnlpJars/jenkins-cli.jar', out='jenkins-cli.jar')
# cre_list_cmd = f'java -jar {cli_jar} -auth {self.username}:{self.password} -s {self.url} list-credentials system::system::jenkins'
# cre_res = os.popen(cre_list_cmd)
# cre_list = cre_res.readlines()
del_cre_cmd = f'java -jar {cli_jar} -s {self.url} delete-credentials system::system::jenkins _ {id} --username {self.username} --password {self.password}'
# print(del_cre_cmd)
os.popen(del_cre_cmd)
time.sleep(2)
print(f"credential {folder_name}/{id} is absent !")
else:
cre_status = self.server.credential_exists(id, folder_name)
if cre_status:
res = self.server.delete_credential(id, folder_name)
print(res.read())
print(f"delete credential {folder_name}/{id} ok !")
print(f"credential {folder_name}/{id} is absent !")
return ''
def create_cre(self, id, folder_name, template_path, id_type, content, username=''):
my_cre = []
# if os.path.exists(f"{template_path}/credential_{id}.xml"):
# os.remove(f"{template_path}/credential_{id}.xml")
if id_type == 'ssh_private':
template = open(f'{template_path}/Credentials_SSH_PrivateKey.xml', 'r', encoding='UTF-8')
elif id_type == 'user_pass':
template = open(f'{template_path}/Credentials_Username_with_password.xml', 'r', encoding='UTF-8')
elif id_type == 'secret_txt':
template = open(f'{template_path}/Credentials_SecretText.xml', 'r', encoding='UTF-8')
else:
print("credential type error !")
exit(-1)
template_content = template.read().split('\n')
for line in template_content:
if id_type == 'ssh_private':
if '<id>' in format(line):
line = f'<id>{id}</id>'
elif '<username>' in format(line):
line = f'<username>{username}</username>'
elif '</privateKey>' in format(line):
line = content + '\n' + line
elif id_type == 'user_pass':
if '<id>' in format(line):
line = f'<id>{id}</id>'
elif '<username>' in format(line):
line = f'<username>{username}</username>'
elif '</password>' in format(line):
line = content + '\n' + line
elif id_type == 'secret_txt':
if '<id>' in format(line):
line = f'<id>{id}</id>'
elif '</secret>' in format(line):
line = content + '\n' + line
my_cre.append(line)
if folder_name == '.':
if os.path.exists(f"{template_path}/credential_sys_{id}.xml"):
os.remove(f"{template_path}/credential_sys_{id}.xml")
cre_file = open(f"{template_path}/credential_sys_{id}.xml", 'a')
for i in my_cre:
cre_file.write(i + "\n")
cre_file.close()
if os.path.exists('jenkins-cli.jar'):
os.remove('jenkins-cli.jar')
cli_jar = wget.download(f'{self.url}/jnlpJars/jenkins-cli.jar', out='jenkins-cli.jar')
add_cre_cmd = f'java -jar {cli_jar} -s {self.url} create-credentials-by-xml system::system::jenkins _ < {template_path}/credential_sys_{id}.xml --username {self.username} --password {self.password} '
#create sys credentials
# print(add_cre_cmd)
os.popen(add_cre_cmd)
time.sleep(5)
os.remove(f"{template_path}/credential_sys_{id}.xml")
else:
credential_xml = format(''.join(my_cre))
self.server.create_credential(folder_name, credential_xml)
print(f"create credential {folder_name}/{id} SUCCESS!")
return ''
任务管理(jk_jobs.py):
[Python] 纯文本查看 复制代码 import os
import time
import jenkins
from jk_auth import *
class jobs(auth):
def __init__(self, url, username, password):
auth.__init__(self, url, username, password)
self.url = url
self.username = username
self.password = password
def create_folder(self, folder_fullname):
if folder_fullname != '.':
folder_path = ''
job_dic = []
jobs_list = self.server.get_all_jobs()
folder_depth = format(folder_fullname).split("/")
for job in jobs_list:
# print(job['fullname'])
job_dic.append(job['fullname'])
for folder_i in folder_depth:
folder_path = str(folder_path) + format("/") + str(folder_i)
print(folder_path.strip('/'))
if folder_path.strip('/') in job_dic:
print(f"folder {folder_path.strip('/')} is present.")
else:
self.server.create_job(folder_path.strip('/'), jenkins.EMPTY_FOLDER_XML)
print(f"folder {folder_path.strip('/')} create SUCCESS.")
else:
print(f"folder system is present.")
def create_job(self, template_path, job_name, groovy_list=[], parameters=[], folder_name=''):
my_job = []
if folder_name != '.':
folder_name = folder_name + '/'
else:
folder_name = ''
# delete old job
jobs_list = self.server.get_all_jobs()
for job in jobs_list:
if f'{folder_name}{job_name}' == job['fullname']:
self.server.delete_job(f'{folder_name}{job_name}')
print(f"delete job {folder_name}{job_name} SUCCESS !")
template = open(f'{template_path}/jenkins_templates.xml', 'r', encoding='UTF-8')
template_content = template.read().split('\n')
# print(template_content)
for line in template_content:
if '/parameterDefinitions' in format(line):
for para_name in parameters:
para = f"""
<hudson.model.StringParameterDefinition>
<name>{para_name}</name>
<description> {para_name} </description>
<defaultValue> </defaultValue>
</hudson.model.StringParameterDefinition>
"""
line = format(para) + '\n' + line
if 'template' in format(line):
for groovy_file in groovy_list:
line = " load '" + groovy_file + "'" + '\n'+ line
my_job.append(line)
job_config = format('\n'.join(my_job))
#
# for i in my_job:
# job_file = open(f"{template_path}/{job_name}.xml", 'a')
# job_file.write(i+"\n")
# job_file.close()
# job_content = open(f"{template_path}/{job_name}.xml", 'r', encoding='UTF-8')
# job_config = format(job_content.read())
#create job
self.server.create_job(f'{folder_name}{job_name}', job_config)
print(f"create job {folder_name}{job_name} SUCCESS !")
# def run_job(self, job_name, run_para={}):
# self.server.build_job(job_name, run_para)
# build_info = self.server.get_build_info(job_name, last_build_number)
# print(build_info)
def run_job(self, job_name, run_para='', folder_name=''):
folder_depth = format(folder_name).split("/")
folder_path = ''
if folder_name != '.':
for folder_l in folder_depth:
folder_path = folder_path + '/' + folder_l + '/job'
folder_name = folder_name + '/'
else:
folder_name = ''
# run job
run_cmd = f'curl -s -X POST --user {self.username}:{self.password} ' \
f'{self.url}/job{folder_path}/{job_name}/buildWithParameters -d "{run_para}"'
os.system(run_cmd)
time.sleep(2)
print(f'{folder_name}{job_name}')
last_build_number = self.server.get_job_info(f'{folder_name}{job_name}')['builds'][0]['number']
# print(last_build_number)
#查询任务状态
job_stat = 'TRUE'
while job_stat == 'TRUE':
build_info = self.server.get_build_info(f'{folder_name}{job_name}', last_build_number)
if build_info['result'] in ['SUCCESS', 'FAILURE', 'ABORTED']:
job_stat = build_info['building']
else:
print(f"job {folder_name}{job_name} buildID #{last_build_number} status is running !")
time.sleep(2)
if job_stat != 'TRUE':
print(f"run job {folder_name}{job_name} buildID #{last_build_number} status is {build_info['result']} !")
# 获取日志
console_cmd = f'curl --user {self.username}:{self.password} ' \
f'{self.url}/job{folder_path}/{job_name}/1/consoleText'
console_out = os.popen(console_cmd)
print(console_out.read())
def stop_job(self, job_name, folder_name=''):
folder_depth = format(folder_name).split("/")
folder_path = ''
if folder_name != '.':
for folder_l in folder_depth:
folder_path = folder_path + '/' + folder_l + '/job'
folder_name = folder_name + '/'
else:
folder_name = ''
last_build_number = self.server.get_job_info(f'{folder_name}{job_name}')['builds'][0]['number']
# print(last_build_number)
stop_cmd = f'curl -s -X POST --user {self.username}:{self.password} ' \
f'{self.url}/job{folder_path}/{job_name}/{last_build_number}/stop'
job_stat = 'TRUE'
print(stop_cmd)
while job_stat == 'TRUE':
build_info = self.server.get_build_info(f'{folder_name}{job_name}', last_build_number)
if build_info['result'] not in ['SUCCESS', 'FAILURE', 'ABORTED']:
print(f"job {folder_name}{job_name} buildID #{last_build_number} status is running !")
os.popen(stop_cmd)
time.sleep(2)
else:
job_stat = build_info['building']
print(f"job {folder_name}{job_name} buildID #{last_build_number} stop finished !")
print(f"job {folder_name}{job_name} buildID #{last_build_number} status is {build_info['result']} !")
节点管理(jk_node.py):
[Python] 纯文本查看 复制代码 import os
import wget
import jenkins
from jk_auth import auth
class node(auth):
def __init__(self, url, username, password):
auth.__init__(self, url, username, password)
self.url = url
self.username = username
self.password = password
def delete_node(self, node_name):
if node_name == 'master':
print("node name can not use 'master'!")
exit(1)
# delete node
if os.path.exists('jenkins-cli.jar'):
os.remove('jenkins-cli.jar')
node_info = self.server.get_nodes()
for node_i in node_info:
if node_name == (node_i['name']):
cli_jar = wget.download(f'{self.url}/jnlpJars/jenkins-cli.jar', out='jenkins-cli.jar')
os.system(
f'java -jar {cli_jar} -auth {self.username}:{self.password} -s {self.url} delete-node {node_name}')
def create_node(self, node_name, port, username, cre_id, host):
node_info = self.server.get_nodes()
for node_i in node_info:
if node_name == (node_i['name']):
print(f"node name {node_name} is present, plase delete after !")
exit(-1)
# create node#
params = {
'port': port,
'username': username,
'credentialsId': cre_id,
'host': host
}
self.server.create_node(
node_name,
nodeDescription=node_name,
remoteFS='~/',
labels='',
exclusive=True,
launcher=jenkins.LAUNCHER_SSH,
launcher_params=params
)
return "create node " + node_name + " ok !"
|
|