设为首页 收藏本站
查看: 2399|回复: 0

[经验分享] 平台化之利用ansible api接口的async使任务异步

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2014-7-15 13:30:04 | 显示全部楼层 |阅读模式
今个也不前言了,直接切入主题 !


同事在做一个关于方便的批量上线初始化的平台,其实我真不想说平台两个字,啥都平台,显得有些不时尚了 。 ansible是个好东西,只是上线打包初始化的任务太耗时间,经常是20分钟左右。这时候需要异步的执行,对于ansible来说,客户端把消耗时间的任务通过signal信号来处理,把结果扔到以jid为文件名的文件里。。。
(这两天写个用tornado 实现ansible的web ui)   

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#xiaorui.cc
import sys
import time
import ansible.runner
runner = ansible.runner.Runner(
   module_name='shell',
   module_args=sys.argv[1],
   pattern='web',
)
datastructure = runner.run_async(30)
print datastructure
print datastructure[1].poll()
time.sleep(3)
print datastructure[1].poll()
time.sleep(3)
print datastructure[1].poll()
time.sleep(3)
print datastructure[1].poll()




终端执行的结果:
1
2
3
4
5
6
[iyunv@67 ~]# python cmd.py "sleep 6;echo 1111"
({'dark': {}, 'contacted': {'10.10.10.66': {u'started': 1, 'invocation': {'module_name': 'shell', 'module_args': 'sleep 6;echo 1111'}, u'results_file': u'/root/.ansible_async/637134991839', u'ansible_job_id': u'637134991839'}}}, <ansible.runner.poller.AsyncPoller object at 0x9d75c2c>)
{'dark': {}, 'polled': {'10.10.10.66': {u'started': 1, 'invocation': {'module_name': 'async_status', 'module_args': u'jid=637134991839'}, u'results_file': u'/root/.ansible_async/637134991839', u'ansible_job_id': u'637134991839', u'changed': False}}, 'contacted': {}}
{'dark': {}, 'polled': {'10.10.10.66': {u'started': 1, 'invocation': {'module_name': 'async_status', 'module_args': u'jid=637134991839'}, u'results_file': u'/root/.ansible_async/637134991839', u'ansible_job_id': u'637134991839', u'changed': False}}, 'contacted': {}}
{'dark': {}, 'polled': {}, 'contacted': {'10.10.10.66': {u'changed': True, u'end': u'2014-06-05 07:06:19.426764', u'ansible_job_id': u'637134991839', u'stdout': u'1111', u'cmd': u'sleep 6;echo 1111 ', 'invocation': {'module_name': 'async_status', 'module_args': u'jid=637134991839'}, u'start': u'2014-06-05 07:06:13.420793', u'finished': 1, u'stderr': u'', u'rc': 0, u'delta': u'0:00:06.005971'}}}
{'dark': {}, 'polled': {}, 'contacted': {'10.10.10.66': {u'changed': True, u'end': u'2014-06-05 07:06:19.426764', u'ansible_job_id': u'637134991839', u'stdout': u'1111', u'cmd': u'sleep 6;echo 1111 ', 'invocation': {'module_name': 'async_status', 'module_args': u'jid=637134991839'}, u'start': u'2014-06-05 07:06:13.420793', u'finished': 1, u'stderr': u'', u'rc': 0, u'delta': u'0:00:06.005971'}}}




wKiom1OPRLXS8lMbAASdiMRI_fo505.jpg

在运行的过程中,我强制中断主进程。
因为任务是fork到守候进程,所以任务还是被执行的。

1
2
[iyunv@66 ~]# cat /root/.ansible_async/656009932164
{"changed": true, "end": "2014-06-05 07:10:15.741252", "stdout": "1111", "cmd": "sleep 10;echo 1111 ", "start": "2014-06-05 07:10:05.736109", "delta": "0:00:10.005143", "stderr": "", "rc": 0}




想知道他是怎么异步的获取状态的么? 
比起saltstack这种逆天的工具来说,ansible的返回值显得有点不时尚。
他的任务结果会输出到.ansible_aysnc的一个token文件里面的。然后你每次去poll任务,他都是拿着这个jid,然后ssh到客户端cat数据,然后把数据json.loads。

当执行异步任务的时候,我监控了22端口的行为数据。
配合上面的python调用ansible api 的那个脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
T 10.10.10.66:22 -> 10.10.10.67:33046 [A]

T 10.10.10.66:22 -> 10.10.10.67:33046 [A]

T 10.10.10.66:22 -> 10.10.10.67:33046 [A]

T 10.10.10.66:22 -> 10.10.10.67:33046 [A]

T 10.10.10.66:22 -> 10.10.10.67:33046 [A]

T 10.10.10.66:22 -> 10.10.10.67:33046 [A]

T 10.10.10.66:22 -> 10.10.10.67:33046 [A]

T 10.10.10.66:22 -> 10.10.10.67:33046 [A]

T 10.10.10.66:22 -> 10.10.10.67:33046 [A]

T 10.10.10.66:22 -> 10.10.10.67:33046 [A]

T 10.10.10.66:22 -> 10.10.10.67:33046 [A]

T 10.10.10.66:22 -> 10.10.10.67:33046 [A]

T 10.10.10.67:33046 -> 10.10.10.66:22 [A]

T 10.10.10.67:33046 -> 10.10.10.66:22 [A]

T 10.10.10.67:33046 -> 10.10.10.66:22 [A]

T 10.10.10.67:33046 -> 10.10.10.66:22 [A]

T 10.10.10.67:33046 -> 10.10.10.66:22 [AR
xiaorui.cc




进程的监控,会看到我每次poll数据的时候,都是派生了一个新进程。
1
2
3
4
5
[iyunv@66 /var]# ps aux|grep python
root      1216  0.0  1.9  74456 20392 ?        Sl   06:18   0:04 /usr/bin/python /usr/bin/salt-minion -d
root      6251  0.0  0.2   9312  2512 ?        S    07:45   0:00 /usr/bin/python /root/.ansible/tmp/ansible-tmp-1401896304.86-33623655469433/async_wrapper 701347917770 30 /root/.ansible/tmp/ansible-tmp-1401896304.86-33623655469433/command /root/.ansible/tmp/ansible-tmp-1401896304.86-33623655469433/arguments
root      6252  0.0  0.2   9312  2784 ?        S    07:45   0:00 /usr/bin/python /root/.ansible/tmp/ansible-tmp-1401896304.86-33623655469433/async_wrapper 701347917770 30 /root/.ansible/tmp/ansible-tmp-1401896304.86-33623655469433/command /root/.ansible/tmp/ansible-tmp-1401896304.86-33623655469433/arguments
root      6253  2.0  0.6  15000  6376 ?        S    07:45   0:00 /usr/bin/python /root/.ansible/tmp/ansible-tmp-1401896304.86-33623655469433/command /root/.ansible/tmp/ansible-tmp-1401896304.86-33623655469433/ar




看看客户端的那几个临时文件
1
2
3
4
5
6
7
[iyunv@66 ansible-tmp-1401896171.16-200148091205946]# ll
总用量 60
#xiaorui.cc
-rw-r--r--. 1 root root    28 6月   5 07:43 arguments
-rw-r--r--. 1 root root  6159 6月   5 07:43 async_wrapper
-rwxr-xr-x. 1 root root 47795 6月   5 07:43 command
[iyunv@66 ansible-tmp-1401896171.16-200148091205946]#





command是模块,用来执行linux命令的。argument文件里面是给command传递的参数,async_wrapper这个是关键。  是这次任务能异步起来的核心文件。



看这段代码可以看出ansible使用异步的时候,所采用的一些方法。

看了这个脚本,在结合上线咱们ps aux抓到的正在执行的ansible过来的ssh执行的命令。
/usr/bin/python /root/.ansible/tmp/ansible-tmp-1401896304.86-33623655469433/async_wrapper 701347917770 30 /root/.ansible/tmp/ansible-tmp-1401896304.86-33623655469433/command /root/.ansible/tmp/ansible-tmp-1401896304.86-33623655469433/arguments

给async_wrapper传递了四个参数,jid,超时的时间,引用的模块,传递给模块的参数。 jid给aysnc_wrapper,是为了把结果序列化json存入到jid文件里面。给的时间,是用来通过signal信号做超时的限制 !  模块和参数就不讲了,你应该懂!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#!/usr/bin/python
# -*- coding: utf-8 -*-


try:
    import json
except ImportError:
    import simplejson as json
import shlex
import os
import subprocess
import sys
import datetime
import traceback
import signal
import time
import syslog

def daemonize_self():
    # daemonizing code: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012
    # logger.info("cobblerd started")
    try:
        pid = os.fork()
        if pid > 0:
            # exit first parent
            sys.exit(0)
    except OSError, e:
        print >>sys.stderr, "fork #1 failed: %d (%s)" % (e.errno, e.strerror)
        sys.exit(1)

    # decouple from parent environment
    os.chdir("/")
    os.setsid()
    os.umask(022)

    # do second fork
    try:
        pid = os.fork()
        if pid > 0:
            # print "Daemon PID %d" % pid
            sys.exit(0)
    except OSError, e:
        print >>sys.stderr, "fork #2 failed: %d (%s)" % (e.errno, e.strerror)
        sys.exit(1)

    dev_null = file('/dev/null','rw')
    os.dup2(dev_null.fileno(), sys.stdin.fileno())
    os.dup2(dev_null.fileno(), sys.stdout.fileno())
    os.dup2(dev_null.fileno(), sys.stderr.fileno())

if len(sys.argv) < 3:
    print json.dumps({
        "failed" : True,
        "msg"    : "usage: async_wrapper <jid> <time_limit> <modulescript> <argsfile>.  Humans, do not call directly!"
    })
    sys.exit(1)

jid = sys.argv[1]
time_limit = sys.argv[2]
wrapped_module = sys.argv[3]
argsfile = sys.argv[4]
cmd = "%s %s" % (wrapped_module, argsfile)

syslog.openlog('ansible-%s' % os.path.basename(__file__))
syslog.syslog(syslog.LOG_NOTICE, 'Invoked with %s' % " ".join(sys.argv[1:]))

# setup logging directory
logdir = os.path.expanduser("~/.ansible_async")
log_path = os.path.join(logdir, jid)

if not os.path.exists(logdir):
    try:
        os.makedirs(logdir)
    except:
        print json.dumps({
            "failed" : 1,
            "msg" : "could not create: %s" % logdir
        })

def _run_command(wrapped_cmd, jid, log_path):

    logfile = open(log_path, "w")
    logfile.write(json.dumps({ "started" : 1, "ansible_job_id" : jid }))
    logfile.close()
    logfile = open(log_path, "w")
    result = {}

    outdata = ''
    try:
        cmd = shlex.split(wrapped_cmd)
        script = subprocess.Popen(cmd, shell=False,
            stdin=None, stdout=logfile, stderr=logfile)
        script.communicate()
        outdata = file(log_path).read()
        result = json.loads(outdata)

    except (OSError, IOError), e:
        result = {
            "failed": 1,
            "cmd" : wrapped_cmd,
            "msg": str(e),
        }
        result['ansible_job_id'] = jid
        logfile.write(json.dumps(result))
    except:
        result = {
            "failed" : 1,
            "cmd" : wrapped_cmd,
            "data" : outdata, # temporary debug only
            "msg" : traceback.format_exc()
        }
        result['ansible_job_id'] = jid
        logfile.write(json.dumps(result))
    logfile.close()

# immediately exit this process, leaving an orphaned process
# running which immediately forks a supervisory timing process

#import logging
#import logging.handlers

#logger = logging.getLogger("ansible_async")
#logger.setLevel(logging.WARNING)
#logger.addHandler( logging.handlers.SysLogHandler("/dev/log") )
def debug(msg):
    #logger.warning(msg)
    pass

try:
    pid = os.fork()
    if pid:
        # Notify the overlord that the async process started

        # we need to not return immmediately such that the launched command has an attempt
        # to initialize PRIOR to ansible trying to clean up the launch directory (and argsfile)
        # this probably could be done with some IPC later.  Modules should always read
        # the argsfile at the very first start of their execution anyway
        time.sleep(1)
        debug("Return async_wrapper task started.")
        print json.dumps({ "started" : 1, "ansible_job_id" : jid, "results_file" : log_path })
        sys.stdout.flush()
        sys.exit(0)
    else:
        # The actual wrapper process

        # Daemonize, so we keep on running
        daemonize_self()

        # we are now daemonized, create a supervisory process
        debug("Starting module and watcher")

        sub_pid = os.fork()
        if sub_pid:
            # the parent stops the process after the time limit
            remaining = int(time_limit)

            # set the child process group id to kill all children
            os.setpgid(sub_pid, sub_pid)

            debug("Start watching %s (%s)"%(sub_pid, remaining))
            time.sleep(5)
            while os.waitpid(sub_pid, os.WNOHANG) == (0, 0):
                debug("%s still running (%s)"%(sub_pid, remaining))
                time.sleep(5)
                remaining = remaining - 5
                if remaining <= 0:
                    debug("Now killing %s"%(sub_pid))
                    os.killpg(sub_pid, signal.SIGKILL)
                    debug("Sent kill to group %s"%sub_pid)
                    time.sleep(1)
                    sys.exit(0)
            debug("Done in kid B.")
            os._exit(0)
        else:
            # the child process runs the actual module
            debug("Start module (%s)"%os.getpid())
            _run_command(cmd, jid, log_path)
            debug("Module complete (%s)"%os.getpid())
            sys.exit(0)

except Exception, err:
    debug("error: %s"%(err))
    raise err






话说,ansible为了兼容别的语言,还真是做了不少的牺牲。一切都是各种标准输入输出的。。。。哎,如果把ansible api 的async处理方式用在ansible的web平台上,有些不合适。当然,你如果耗得起貌似都ssh过去,认证,执行,然后扔到async_wrapper文件里面的过程,那你.......(nx)。 话说ansible真的不快......你真的可以试试。。。。
建议大家看看ansible api的源码,实现下returners的方式。  同事已经实现了,其实很简单,有兴趣的朋友,可以在看看我上次的那个ansible runner源码的文章,肯定能找到思路。   还有大家也可以通过修改 async_wrapper的代码实现returner。   这两天有时间会写点 ansible做web平台上遇到的问题。


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-22080-1-1.html 上篇帖子: 探索ansible runner的源码及执行api原理 下篇帖子: 运维自动化之ansible playbook安装lnmp环境 接口
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表