设为首页 收藏本站
查看: 3123|回复: 0

[资源发布] python脚本管理jenkins

[复制链接]
累计签到:86 天
连续签到:1 天
发表于 2020-9-17 20:48:08 | 显示全部楼层 |阅读模式
本帖最后由 zhuguojun6 于 2020-9-17 20:53 编辑

python脚本管理jenkins个人开发,仅供 参考,谢谢


登陆类(jk_auth.py):
[Python] 纯文本查看 复制代码
import jenkins
class auth(object):
    def __init__(self, url, username, password):
        self.url = url
        self.username = username
        self.password = password
        self.server = jenkins.Jenkins(self.url, self.username, self.password)


证书管理(jk_credentials.py):
[Python] 纯文本查看 复制代码
import os
import wget
import time
from jk_auth import *


class credential(auth):
    def __init__(self, url, username, password):
        auth.__init__(self, url, username, password)
        self.url = url
        self.username = username
        self.password = password

    def delete_cre(self, id, folder_name):
        if folder_name == '.':
            if os.path.exists('jenkins-cli.jar'):
                os.remove('jenkins-cli.jar')
            cli_jar = wget.download(f'{self.url}/jnlpJars/jenkins-cli.jar', out='jenkins-cli.jar')
            # cre_list_cmd = f'java -jar {cli_jar} -auth {self.username}:{self.password} -s {self.url} list-credentials system::system::jenkins'
            # cre_res = os.popen(cre_list_cmd)
            # cre_list = cre_res.readlines()
            del_cre_cmd = f'java -jar {cli_jar}  -s {self.url} delete-credentials system::system::jenkins _ {id} --username {self.username} --password {self.password}'
            # print(del_cre_cmd)
            os.popen(del_cre_cmd)
            time.sleep(2)
            print(f"credential {folder_name}/{id} is absent !")

        else:
            cre_status = self.server.credential_exists(id, folder_name)
            if cre_status:
                res = self.server.delete_credential(id, folder_name)
                print(res.read())
                print(f"delete credential {folder_name}/{id} ok !")
                print(f"credential {folder_name}/{id} is absent !")
        return ''

    def create_cre(self, id, folder_name, template_path, id_type, content, username=''):
        my_cre = []
        # if os.path.exists(f"{template_path}/credential_{id}.xml"):
        #     os.remove(f"{template_path}/credential_{id}.xml")
        if id_type == 'ssh_private':
            template = open(f'{template_path}/Credentials_SSH_PrivateKey.xml', 'r', encoding='UTF-8')
        elif id_type == 'user_pass':
            template = open(f'{template_path}/Credentials_Username_with_password.xml', 'r', encoding='UTF-8')
        elif id_type == 'secret_txt':
            template = open(f'{template_path}/Credentials_SecretText.xml', 'r', encoding='UTF-8')
        else:
            print("credential type error !")
            exit(-1)
        template_content = template.read().split('\n')
        for line in template_content:
            if id_type == 'ssh_private':
                if '<id>' in format(line):
                    line = f'<id>{id}</id>'
                elif '<username>' in format(line):
                    line = f'<username>{username}</username>'
                elif '</privateKey>' in format(line):
                    line = content + '\n' + line
            elif id_type == 'user_pass':
                if '<id>' in format(line):
                    line = f'<id>{id}</id>'
                elif '<username>' in format(line):
                    line = f'<username>{username}</username>'
                elif '</password>' in format(line):
                    line = content + '\n' + line
            elif id_type == 'secret_txt':
                if '<id>' in format(line):
                    line = f'<id>{id}</id>'
                elif '</secret>' in format(line):
                    line = content + '\n' + line
            my_cre.append(line)

        if folder_name == '.':
            if os.path.exists(f"{template_path}/credential_sys_{id}.xml"):
                os.remove(f"{template_path}/credential_sys_{id}.xml")
            cre_file = open(f"{template_path}/credential_sys_{id}.xml", 'a')
            for i in my_cre:
                cre_file.write(i + "\n")
            cre_file.close()

            if os.path.exists('jenkins-cli.jar'):
                os.remove('jenkins-cli.jar')
            cli_jar = wget.download(f'{self.url}/jnlpJars/jenkins-cli.jar', out='jenkins-cli.jar')
            add_cre_cmd = f'java -jar {cli_jar} -s {self.url} create-credentials-by-xml  system::system::jenkins _  < {template_path}/credential_sys_{id}.xml --username {self.username} --password {self.password} '
            #create sys credentials
            # print(add_cre_cmd)
            os.popen(add_cre_cmd)
            time.sleep(5)
            os.remove(f"{template_path}/credential_sys_{id}.xml")
        else:
            credential_xml = format(''.join(my_cre))
            self.server.create_credential(folder_name, credential_xml)
        print(f"create credential {folder_name}/{id} SUCCESS!")
        return ''


任务管理(jk_jobs.py):
[Python] 纯文本查看 复制代码
import os
import time
import jenkins
from jk_auth import *


class jobs(auth):
    def __init__(self, url, username, password):
        auth.__init__(self, url, username, password)
        self.url = url
        self.username = username
        self.password = password

    def create_folder(self, folder_fullname):
        if folder_fullname != '.':
            folder_path = ''
            job_dic = []
            jobs_list = self.server.get_all_jobs()
            folder_depth = format(folder_fullname).split("/")
            for job in jobs_list:
                # print(job['fullname'])
                job_dic.append(job['fullname'])
            for folder_i in folder_depth:
                folder_path = str(folder_path) + format("/") + str(folder_i)
                print(folder_path.strip('/'))
                if folder_path.strip('/') in job_dic:
                    print(f"folder {folder_path.strip('/')} is present.")

                else:
                    self.server.create_job(folder_path.strip('/'), jenkins.EMPTY_FOLDER_XML)
                    print(f"folder {folder_path.strip('/')} create SUCCESS.")
        else:
            print(f"folder system is present.")

    def create_job(self, template_path, job_name, groovy_list=[], parameters=[], folder_name=''):

        my_job = []
        if folder_name != '.':
            folder_name = folder_name + '/'
        else:
            folder_name = ''
        # delete old job
        jobs_list = self.server.get_all_jobs()
        for job in jobs_list:
            if f'{folder_name}{job_name}' == job['fullname']:
                self.server.delete_job(f'{folder_name}{job_name}')
                print(f"delete job {folder_name}{job_name} SUCCESS !")

        template = open(f'{template_path}/jenkins_templates.xml', 'r', encoding='UTF-8')
        template_content = template.read().split('\n')
        # print(template_content)
        for line in template_content:
            if '/parameterDefinitions' in format(line):
                for para_name in parameters:
                    para = f"""
                    <hudson.model.StringParameterDefinition>
                    <name>{para_name}</name>
                    <description> {para_name} </description>
                    <defaultValue>  </defaultValue>
                    </hudson.model.StringParameterDefinition> 
                    """

                    line = format(para) + '\n' + line
            if 'template' in format(line):
                for groovy_file in groovy_list:
                    line = "    load '" + groovy_file + "'" + '\n'+ line
            my_job.append(line)
        job_config = format('\n'.join(my_job))

        #
        # for i in my_job:
        #     job_file = open(f"{template_path}/{job_name}.xml", 'a')
        #     job_file.write(i+"\n")
        #     job_file.close()
        # job_content = open(f"{template_path}/{job_name}.xml", 'r', encoding='UTF-8')
        # job_config = format(job_content.read())
        #create job
        self.server.create_job(f'{folder_name}{job_name}', job_config)
        print(f"create job {folder_name}{job_name} SUCCESS !")

    # def run_job(self, job_name, run_para={}):
    #     self.server.build_job(job_name, run_para)
    #     build_info = self.server.get_build_info(job_name, last_build_number)
    #     print(build_info)

    def run_job(self, job_name, run_para='', folder_name=''):
        folder_depth = format(folder_name).split("/")
        folder_path = ''
        if folder_name != '.':
            for folder_l in folder_depth:
                folder_path = folder_path + '/' + folder_l + '/job'
            folder_name = folder_name + '/'
        else:
            folder_name = ''
        # run job
        run_cmd = f'curl -s -X POST --user {self.username}:{self.password} ' \
                  f'{self.url}/job{folder_path}/{job_name}/buildWithParameters -d "{run_para}"'

        os.system(run_cmd)
        time.sleep(2)
        print(f'{folder_name}{job_name}')
        last_build_number = self.server.get_job_info(f'{folder_name}{job_name}')['builds'][0]['number']
        # print(last_build_number)
        #查询任务状态
        job_stat = 'TRUE'
        while job_stat == 'TRUE':
            build_info = self.server.get_build_info(f'{folder_name}{job_name}', last_build_number)
            if build_info['result'] in ['SUCCESS', 'FAILURE', 'ABORTED']:
                job_stat = build_info['building']
            else:
                print(f"job {folder_name}{job_name} buildID #{last_build_number} status is running !")
                time.sleep(2)
            if job_stat != 'TRUE':
                print(f"run job {folder_name}{job_name} buildID #{last_build_number} status is {build_info['result']} !")
        # 获取日志
        console_cmd = f'curl --user {self.username}:{self.password} ' \
                      f'{self.url}/job{folder_path}/{job_name}/1/consoleText'
        console_out = os.popen(console_cmd)
        print(console_out.read())

    def stop_job(self, job_name, folder_name=''):
        folder_depth = format(folder_name).split("/")
        folder_path = ''
        if folder_name != '.':
            for folder_l in folder_depth:
                folder_path = folder_path + '/' + folder_l + '/job'
            folder_name = folder_name + '/'
        else:
            folder_name = ''

        last_build_number = self.server.get_job_info(f'{folder_name}{job_name}')['builds'][0]['number']
        # print(last_build_number)
        stop_cmd = f'curl -s -X POST --user {self.username}:{self.password} ' \
                   f'{self.url}/job{folder_path}/{job_name}/{last_build_number}/stop'

        job_stat = 'TRUE'
        print(stop_cmd)
        while job_stat == 'TRUE':
            build_info = self.server.get_build_info(f'{folder_name}{job_name}', last_build_number)
            if build_info['result'] not in ['SUCCESS', 'FAILURE', 'ABORTED']:
                print(f"job {folder_name}{job_name} buildID #{last_build_number} status is running !")
                os.popen(stop_cmd)
                time.sleep(2)
            else:
                job_stat = build_info['building']
                print(f"job {folder_name}{job_name} buildID #{last_build_number} stop finished !")
                print(f"job {folder_name}{job_name} buildID #{last_build_number} status is {build_info['result']} !")






节点管理(jk_node.py):
[Python] 纯文本查看 复制代码
import os
import wget
import jenkins
from jk_auth import auth


class node(auth):
    def __init__(self, url, username, password):
        auth.__init__(self, url, username, password)
        self.url = url
        self.username = username
        self.password = password

    def delete_node(self, node_name):
        if node_name == 'master':
            print("node name can not use 'master'!")
            exit(1)
        # delete node
        if os.path.exists('jenkins-cli.jar'):
            os.remove('jenkins-cli.jar')
        node_info = self.server.get_nodes()
        for node_i in node_info:
            if node_name == (node_i['name']):
                cli_jar = wget.download(f'{self.url}/jnlpJars/jenkins-cli.jar', out='jenkins-cli.jar')
                os.system(
                    f'java -jar {cli_jar} -auth {self.username}:{self.password} -s {self.url} delete-node {node_name}')

    def create_node(self, node_name, port, username, cre_id, host):
        node_info = self.server.get_nodes()
        for node_i in node_info:
            if node_name == (node_i['name']):
                print(f"node name {node_name} is  present, plase delete after !")
                exit(-1)
        # create node#
        params = {
            'port': port,
            'username': username,
            'credentialsId': cre_id,
            'host': host
        }
        self.server.create_node(
            node_name,
            nodeDescription=node_name,
            remoteFS='~/',
            labels='',
            exclusive=True,
            launcher=jenkins.LAUNCHER_SSH,
            launcher_params=params
        )
        return "create node " + node_name + " ok !"




运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-992800-1-1.html 上篇帖子: 有出这个视频资源的吗 下篇帖子: python selenium 管理jenkins用户权限
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表