设为首页 收藏本站
查看: 1586|回复: 0

[经验分享] 源码解读Saltstack运行机制之Job Runtime

[复制链接]

尚未签到

发表于 2018-8-1 09:53:59 | 显示全部楼层 |阅读模式
  鄙人深切以为
“在开源的世界里阅读源码才是深层次理解软件运行机制的不二法门”,本系列即是旨在通过解读(当然,鄙人实力有限,只是粗浅的解读)Saltstack源
码的方式,了解Saltstack内部的运行机制,对排障和更好理解和二次开发Saltstack有一定助力。
  本篇文章即解决”salt 如何从启动minion到执行一个job“的困惑,鄙人才疏学浅,有欠缺的地方还请海涵。
  > under salt master & minion 0.17.5
一个Salt Minion的启动过程
  从init.d下的启动脚本开始追溯可以定位启动代码位于salt下的__init__.py初始脚本里的Minion类,它会执行prepare方法来完成一系列的准备工作:

  •   读取配置文件,获取一系列的参数变量;
  •   调用Minion或MultiMinion方法,在这一步获取grains数据,并尝试和Master认证,Pillar的编译(?) ;
      (这里会做一次判断,如果minion配置文件里指向了多个master,则会调用salt.minion.MultiMinion方法来启动多个Minion Server instance,这里暂时只讨论调用Minion方法的情况)
  •   check_user判断当前调起Minion user是否符合conf文件描述,满足则进行Minion Server的tune in,也就是从初始化完成到真正Online working的微调过程;
  •   tune_in 是只有Minion Server或者说主Minion进程才能调用的微调方法,其建立Minion Pub\Pull
    socket,连接到Master的消息总线,并fire
    event声明该Minion已启动,并开始执行设定的scheduler(如果有),并进入一个无限循环,长期的监听event并去完成对应任务,至此
    Minion服务算是启动完毕。
def tune_in(self):  
        '''
  
        Lock onto the publisher.
  
        This is the main event
  
        loop for the minion
  
        '''
  
        ……
        # On first startup execute a state run if configured to do so  
        self._state_run()
  
        time.sleep(.5)
  
        #wjx add
  
        #here start the loop!!!
  
        loop_interval = int(self.opts['loop_interval'])
  
        while True:
基于Zeromq的消息总线
  Salt Master和Minion的交互主要借助Zeromq完成,所有的Job也是通过Zeromq消息总线来下发和接收。Zeromq通信这块相应的被掩盖在底层接口下,目前鄙人暂时未能搞清具体的运作机理,也就相应带过,后面可能专门写文理解一下。
清道夫——Minion Worker
_handle_payload 方法
  正如上文所介绍的那样,Salt Minion Server(Main Minion)在每次收到Master下发的Job时均会对应调起一个新的Worker进程(或线程,具体可以在/etc/salt/minion参数里予以设置,默认为 #multiprocessing: True ),这个操作便是在__handle_payload 方法里完成。
            #wjx add  
            #In main loop, Main Minion will call _handle_payload
  
            #if there is some poll data from Master
  
            socks = dict(self.poller.poll(
  
                loop_interval * 1000)
  
            )
  
            if self.socket in socks and socks[self.socket] == zmq.POLLIN:
  
                payload = self.serial.loads(self.socket.recv())
  
                self._handle_payload(payload)
How to handle payload ?
  那么,_handle_payload方法如何去处理Master端下发的Job呢?答案看源码便明了一二:
def _handle_payload(self, payload):  
    '''
  
    Takes a payload from the master publisher
  
    and does whatever the master wants done.
  
    '''
  
    {'aes': self._handle_aes,     'pub': self._handle_pub,
  
        'clear': self._handle_clear}[payload['enc']](payload['load'],
  
                                   payload['sig'] if 'sig' in payload else None)
  实际上是根据传入的payload
dict的enc键值来对应调用方法,最常见的应该是_handle_aes方法。然后,Minion便根据本地的minion_master.pub来
解密数据,得到data字典(这中间有段代码使用到pub key,但是目前没怎么看懂意思):
try:  
        data = self.crypticle.loads(load)
  解密之后的数据使用_handle_decoded_payload予以对应处理,在这里,Main Minion调用Multiprocessing模块或者Threading模块来唤醒一个新的Worker予以处理对应的job任务:
    #wjx add  
    #There is some difference between Linux and Win
  
    #Multiprocess or Multi-Thread is def in /etc/salt/minion
  
    #also, it will be diff if Master called multi-func like :
  
    #import salt.client
  
    #local = salt.client.LocalClient()
  
    #local.cmd( '*', [ 'test.ping','cmd.run' ], [ [],['w'] ] )
  

  
    instance = self
  
    if self.opts['multiprocessing']:
  
        if sys.platform.startswith('win'):
  
            # let python reconstruct the minion
  
            # on the other side if we're
  
            # running on windows
  
            instance = None
  
        process = multiprocessing.Process(
  
            target=target, args=(instance, self.opts, data)
  
        )    else:
  
        process = threading.Thread(
  
            target=target, args=(instance, self.opts, data)
  
        )
  在Main Minion唤起一个Worker去完成对应Job后,实际上这个Job就只跟Worker Minion有关了,Main
Minion重新回到监听event->处理Job这样的一个循环。新的Worker由以下入口(或者_thread_multi_return)
开始:
@classmethod  
def _thread_return(cls, minion_instance, opts, data):
  
    '''
  
    This method should be used as a threading target,
  
    start the actual minion side execution.
  
    '''
  
    # this seems awkward at first, but it's a workaround for Windows
  
    # multiprocessing communication.
  
    ……
  这里注意一点,在Minion完成decode过程时便由__load_modules导入了Minion端的
functions(也可以理解为modules)以及 returners
数据,所以此时便可以直接调用Master下发的job数据中指定的function:
def __load_modules(self):  
    '''
  
    Return the functions and the returners loaded up from the loader
  
    module
  
    '''
  
    self.opts['grains'] = salt.loader.grains(self.opts)
  
    functions = salt.loader.minion_mods(self.opts)
  
    returners = salt.loader.returners(self.opts, functions)
  
    return functions, returners
  Minion Worker 调用function执行Job:
    #wjx add  
    #use success tag to judge the job ret
  
    ret = {'success': False}
  
    function_name = data['fun']
  
    if function_name in minion_instance.functions:
  
        try:
  
            func = minion_instance.functions[data['fun']]
  
            args, kwargs = parse_args_and_kwargs(func, data['arg'], data)
  
            sys.modules[func.__module__].__context__['retcode'] = 0
  
            return_data = func(*args, **kwargs)
  之后便是Minion Worker反馈数据到Master以及Returner的Job归档:
    #wjx add  
    #send data to master by _return_pub function
  
    ret['jid'] = data['jid']
  
    ret['fun'] = data['fun']
  
    minion_instance._return_pub(ret)    if data['ret']:
  
        ret['id'] = opts['id']
  
        for returner in set(data['ret'].split(',')):
  
            try:
  
                minion_instance.returners['{0}.returner'.format(
  
                    returner
  
                )](ret)
  
            except Exception as exc:
  
                log.error(
  
                    'The return failed for job {0} {1}'.format(
  
                    data['jid'],
  
                    exc
  
                    )
  
                )
  如果一切顺利的话,_thread_return方法成功完成,Worker进程或线程便会自然消亡(到达process.join(),终止)。
总结
  除开细节上可能存在的一些模糊和错误之处需要完善,本文算是告一段落,主要就Minion角度考虑从启动到处理Job的整个运转过程,理解这样一个机制可能对一些Minion Worker的异常僵死的处理有一定助力。Salt的源码本身是一个优秀的Python编程样例,阅读其源码不单有助于理解Salt的运转机制,而且对Python开发也会有一定帮助,可以的话,鄙人会持续进行源码的挖掘。
  下篇文章可能会就salt\salt-run\salt-call等模块亦或是zeromq机制展开讨论,具体内容就看鄙人能不能写出来了……
  鄙人的博客原文地址:http://devopstarter.info/yuan-ma-jie-du-saltstackyun-xing-ji-zhi-zhi-job-runtime/

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-544577-1-1.html 上篇帖子: 使用Docker和saltstack构建运维paas管理平台 下篇帖子: 指定saltstack的任务id-fly天地
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表