zhuguojun6 发表于 2020-9-17 20:48:08

python脚本管理jenkins

本帖最后由 zhuguojun6 于 2020-9-17 20:53 编辑

python脚本管理jenkins个人开发,仅供 参考,谢谢


登陆类(jk_auth.py):
import jenkins
class auth(object):
    def __init__(self, url, username, password):
      self.url = url
      self.username = username
      self.password = password
      self.server = jenkins.Jenkins(self.url, self.username, self.password)

证书管理(jk_credentials.py):
import os
import wget
import time
from jk_auth import *


class credential(auth):
    def __init__(self, url, username, password):
      auth.__init__(self, url, username, password)
      self.url = url
      self.username = username
      self.password = password

    def delete_cre(self, id, folder_name):
      if folder_name == '.':
            if os.path.exists('jenkins-cli.jar'):
                os.remove('jenkins-cli.jar')
            cli_jar = wget.download(f'{self.url}/jnlpJars/jenkins-cli.jar', out='jenkins-cli.jar')
            # cre_list_cmd = f'java -jar {cli_jar} -auth {self.username}:{self.password} -s {self.url} list-credentials system::system::jenkins'
            # cre_res = os.popen(cre_list_cmd)
            # cre_list = cre_res.readlines()
            del_cre_cmd = f'java -jar {cli_jar}-s {self.url} delete-credentials system::system::jenkins _ {id} --username {self.username} --password {self.password}'
            # print(del_cre_cmd)
            os.popen(del_cre_cmd)
            time.sleep(2)
            print(f"credential {folder_name}/{id} is absent !")

      else:
            cre_status = self.server.credential_exists(id, folder_name)
            if cre_status:
                res = self.server.delete_credential(id, folder_name)
                print(res.read())
                print(f"delete credential {folder_name}/{id} ok !")
                print(f"credential {folder_name}/{id} is absent !")
      return ''

    def create_cre(self, id, folder_name, template_path, id_type, content, username=''):
      my_cre = []
      # if os.path.exists(f"{template_path}/credential_{id}.xml"):
      #   os.remove(f"{template_path}/credential_{id}.xml")
      if id_type == 'ssh_private':
            template = open(f'{template_path}/Credentials_SSH_PrivateKey.xml', 'r', encoding='UTF-8')
      elif id_type == 'user_pass':
            template = open(f'{template_path}/Credentials_Username_with_password.xml', 'r', encoding='UTF-8')
      elif id_type == 'secret_txt':
            template = open(f'{template_path}/Credentials_SecretText.xml', 'r', encoding='UTF-8')
      else:
            print("credential type error !")
            exit(-1)
      template_content = template.read().split('\n')
      for line in template_content:
            if id_type == 'ssh_private':
                if '<id>' in format(line):
                  line = f'<id>{id}</id>'
                elif '<username>' in format(line):
                  line = f'<username>{username}</username>'
                elif '</privateKey>' in format(line):
                  line = content + '\n' + line
            elif id_type == 'user_pass':
                if '<id>' in format(line):
                  line = f'<id>{id}</id>'
                elif '<username>' in format(line):
                  line = f'<username>{username}</username>'
                elif '</password>' in format(line):
                  line = content + '\n' + line
            elif id_type == 'secret_txt':
                if '<id>' in format(line):
                  line = f'<id>{id}</id>'
                elif '</secret>' in format(line):
                  line = content + '\n' + line
            my_cre.append(line)

      if folder_name == '.':
            if os.path.exists(f"{template_path}/credential_sys_{id}.xml"):
                os.remove(f"{template_path}/credential_sys_{id}.xml")
            cre_file = open(f"{template_path}/credential_sys_{id}.xml", 'a')
            for i in my_cre:
                cre_file.write(i + "\n")
            cre_file.close()

            if os.path.exists('jenkins-cli.jar'):
                os.remove('jenkins-cli.jar')
            cli_jar = wget.download(f'{self.url}/jnlpJars/jenkins-cli.jar', out='jenkins-cli.jar')
            add_cre_cmd = f'java -jar {cli_jar} -s {self.url} create-credentials-by-xmlsystem::system::jenkins _< {template_path}/credential_sys_{id}.xml --username {self.username} --password {self.password} '
            #create sys credentials
            # print(add_cre_cmd)
            os.popen(add_cre_cmd)
            time.sleep(5)
            os.remove(f"{template_path}/credential_sys_{id}.xml")
      else:
            credential_xml = format(''.join(my_cre))
            self.server.create_credential(folder_name, credential_xml)
      print(f"create credential {folder_name}/{id} SUCCESS!")
      return ''


任务管理(jk_jobs.py):
import os
import time
import jenkins
from jk_auth import *


class jobs(auth):
    def __init__(self, url, username, password):
      auth.__init__(self, url, username, password)
      self.url = url
      self.username = username
      self.password = password

    def create_folder(self, folder_fullname):
      if folder_fullname != '.':
            folder_path = ''
            job_dic = []
            jobs_list = self.server.get_all_jobs()
            folder_depth = format(folder_fullname).split("/")
            for job in jobs_list:
                # print(job['fullname'])
                job_dic.append(job['fullname'])
            for folder_i in folder_depth:
                folder_path = str(folder_path) + format("/") + str(folder_i)
                print(folder_path.strip('/'))
                if folder_path.strip('/') in job_dic:
                  print(f"folder {folder_path.strip('/')} is present.")

                else:
                  self.server.create_job(folder_path.strip('/'), jenkins.EMPTY_FOLDER_XML)
                  print(f"folder {folder_path.strip('/')} create SUCCESS.")
      else:
            print(f"folder system is present.")

    def create_job(self, template_path, job_name, groovy_list=[], parameters=[], folder_name=''):

      my_job = []
      if folder_name != '.':
            folder_name = folder_name + '/'
      else:
            folder_name = ''
      # delete old job
      jobs_list = self.server.get_all_jobs()
      for job in jobs_list:
            if f'{folder_name}{job_name}' == job['fullname']:
                self.server.delete_job(f'{folder_name}{job_name}')
                print(f"delete job {folder_name}{job_name} SUCCESS !")

      template = open(f'{template_path}/jenkins_templates.xml', 'r', encoding='UTF-8')
      template_content = template.read().split('\n')
      # print(template_content)
      for line in template_content:
            if '/parameterDefinitions' in format(line):
                for para_name in parameters:
                  para = f"""
                  <hudson.model.StringParameterDefinition>
                  <name>{para_name}</name>
                  <description> {para_name} </description>
                  <defaultValue></defaultValue>
                  </hudson.model.StringParameterDefinition>
                  """

                  line = format(para) + '\n' + line
            if 'template' in format(line):
                for groovy_file in groovy_list:
                  line = "    load '" + groovy_file + "'" + '\n'+ line
            my_job.append(line)
      job_config = format('\n'.join(my_job))

      #
      # for i in my_job:
      #   job_file = open(f"{template_path}/{job_name}.xml", 'a')
      #   job_file.write(i+"\n")
      #   job_file.close()
      # job_content = open(f"{template_path}/{job_name}.xml", 'r', encoding='UTF-8')
      # job_config = format(job_content.read())
      #create job
      self.server.create_job(f'{folder_name}{job_name}', job_config)
      print(f"create job {folder_name}{job_name} SUCCESS !")

    # def run_job(self, job_name, run_para={}):
    #   self.server.build_job(job_name, run_para)
    #   build_info = self.server.get_build_info(job_name, last_build_number)
    #   print(build_info)

    def run_job(self, job_name, run_para='', folder_name=''):
      folder_depth = format(folder_name).split("/")
      folder_path = ''
      if folder_name != '.':
            for folder_l in folder_depth:
                folder_path = folder_path + '/' + folder_l + '/job'
            folder_name = folder_name + '/'
      else:
            folder_name = ''
      # run job
      run_cmd = f'curl -s -X POST --user {self.username}:{self.password} ' \
                  f'{self.url}/job{folder_path}/{job_name}/buildWithParameters -d "{run_para}"'

      os.system(run_cmd)
      time.sleep(2)
      print(f'{folder_name}{job_name}')
      last_build_number = self.server.get_job_info(f'{folder_name}{job_name}')['builds']['number']
      # print(last_build_number)
      #查询任务状态
      job_stat = 'TRUE'
      while job_stat == 'TRUE':
            build_info = self.server.get_build_info(f'{folder_name}{job_name}', last_build_number)
            if build_info['result'] in ['SUCCESS', 'FAILURE', 'ABORTED']:
                job_stat = build_info['building']
            else:
                print(f"job {folder_name}{job_name} buildID #{last_build_number} status is running !")
                time.sleep(2)
            if job_stat != 'TRUE':
                print(f"run job {folder_name}{job_name} buildID #{last_build_number} status is {build_info['result']} !")
      # 获取日志
      console_cmd = f'curl --user {self.username}:{self.password} ' \
                      f'{self.url}/job{folder_path}/{job_name}/1/consoleText'
      console_out = os.popen(console_cmd)
      print(console_out.read())

    def stop_job(self, job_name, folder_name=''):
      folder_depth = format(folder_name).split("/")
      folder_path = ''
      if folder_name != '.':
            for folder_l in folder_depth:
                folder_path = folder_path + '/' + folder_l + '/job'
            folder_name = folder_name + '/'
      else:
            folder_name = ''

      last_build_number = self.server.get_job_info(f'{folder_name}{job_name}')['builds']['number']
      # print(last_build_number)
      stop_cmd = f'curl -s -X POST --user {self.username}:{self.password} ' \
                   f'{self.url}/job{folder_path}/{job_name}/{last_build_number}/stop'

      job_stat = 'TRUE'
      print(stop_cmd)
      while job_stat == 'TRUE':
            build_info = self.server.get_build_info(f'{folder_name}{job_name}', last_build_number)
            if build_info['result'] not in ['SUCCESS', 'FAILURE', 'ABORTED']:
                print(f"job {folder_name}{job_name} buildID #{last_build_number} status is running !")
                os.popen(stop_cmd)
                time.sleep(2)
            else:
                job_stat = build_info['building']
                print(f"job {folder_name}{job_name} buildID #{last_build_number} stop finished !")
                print(f"job {folder_name}{job_name} buildID #{last_build_number} status is {build_info['result']} !")






节点管理(jk_node.py):
import os
import wget
import jenkins
from jk_auth import auth


class node(auth):
    def __init__(self, url, username, password):
      auth.__init__(self, url, username, password)
      self.url = url
      self.username = username
      self.password = password

    def delete_node(self, node_name):
      if node_name == 'master':
            print("node name can not use 'master'!")
            exit(1)
      # delete node
      if os.path.exists('jenkins-cli.jar'):
            os.remove('jenkins-cli.jar')
      node_info = self.server.get_nodes()
      for node_i in node_info:
            if node_name == (node_i['name']):
                cli_jar = wget.download(f'{self.url}/jnlpJars/jenkins-cli.jar', out='jenkins-cli.jar')
                os.system(
                  f'java -jar {cli_jar} -auth {self.username}:{self.password} -s {self.url} delete-node {node_name}')

    def create_node(self, node_name, port, username, cre_id, host):
      node_info = self.server.get_nodes()
      for node_i in node_info:
            if node_name == (node_i['name']):
                print(f"node name {node_name} ispresent, plase delete after !")
                exit(-1)
      # create node#
      params = {
            'port': port,
            'username': username,
            'credentialsId': cre_id,
            'host': host
      }
      self.server.create_node(
            node_name,
            nodeDescription=node_name,
            remoteFS='~/',
            labels='',
            exclusive=True,
            launcher=jenkins.LAUNCHER_SSH,
            launcher_params=params
      )
      return "create node " + node_name + " ok !"



页: [1]
查看完整版本: python脚本管理jenkins