设为首页 收藏本站
查看: 592|回复: 0

[经验分享] python进程池剖析(三)

[复制链接]

尚未签到

发表于 2015-11-29 10:15:58 | 显示全部楼层 |阅读模式
    之前文章对python中进程池的原理、数据流以及应用从代码角度做了简单的剖析,现在让我们回头看看标准库中对进程池的实现都有哪些值得我们学习的地方。我们知道,进程池内部由多个线程互相协作,向客户端提供可靠的服务,那么这些线程之间是怎样做到数据共享与同步的呢?在客户端使用apply/map函数向进程池分配任务时,使用self._taskqueue来存放任务元素,_taskqueue定义为Queue.Queue(),这是一个python标准库中的线程安全的同步队列,它保证通知时刻只有一个线程向队列添加或从队列获取元素。这样,主线程向进程池中分配任务(taskqueue.put),进程池中_handle_tasks线程读取_taskqueue队列中的元素,两个线程同时操作taskqueue,互不影响。进程池中有N个worker进程在等待任务下发,那么进程池中的_handle_tasks线程读取出任务后,又如何保证一个任务不被多个worker进程获取到呢?我们来看下_handle_tasks线程将任务读取出来之后如何交给worker进程的:



for taskseq, set_length in iter(taskqueue.get, None):
i = -1
for i, task in enumerate(taskseq):
if thread._state:
debug('task handler found thread._state != RUN')
break
try:
put(task)
except Exception as e:
job, ind = task[:2]
try:
cache[job]._set(ind, (False, e))
except KeyError:
pass
else:
if set_length:
debug('doing set_length()')
set_length(i+1)
continue
break
else:
debug('task handler got sentinel')
在从taskqueue中get到任务之后,对任务中的每个task,调用了put函数,这个put函数实际上是将task放入了管道,而主进程与worker进程的交互,正是通过管道来完成的。
再来看看worker进程的定义:
w = self.Process(target=worker,
args=(self._inqueue, self._outqueue,
self._initializer,
self._initargs, self._maxtasksperchild)
)
其中self._inqueue和self._outqueue为SimpleQueue()对象,实际是带锁的管道,上述_handle_task线程调用的put函数,即为SimpleQueue对象的方法。我们看到,这里worker进程定义均相同,所以进程池中的worker进程共享self._inqueue和self._outqueue对象,那么当一个task元素被put到共享的_inqueue管道中时,如何确保只有一个worker获取到呢,答案同样是加锁,在SimpleQueue()类的定义中,put以及get方法都带有锁,进行同步,唯一不同的是,这里的锁是用于进程间同步的。这样就保证了多个worker之间能够确保任务的同步。与分配任务类似,在worker进程运行完之后,会将结果put会_outqueue,_outqueue同样是SimpleQueue类对象,可以在多个进程之间进行互斥。
    在worker进程运行结束之后,会将执行结果通过管道传回,进程池中有_handle_result线程来负责接收result,取出之后,通过调用_set方法将结果写回ApplyResult/MapResult对象,客户端可以通过get方法取出结果,这里通过使用条件变量进行同步,当_set函数执行之后,通过条件变量唤醒阻塞在get函数的主进程。
    进程池终止工作通过调用Pool.terminate()来实现,这里的实现很巧妙,用了一个可调用对象,将终止Pool时的需要执行的回调函数先注册好,等到需要终止时,直接调用对象即可。



self._terminate = Finalize(
    self, self._terminate_pool,
    args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
self._worker_handler, self._task_handler,
self._result_handler, self._cache),
    exitpriority=15
)
在Finalize类的实现了__call__方法,在运行self._terminate()时,就会调用构造self._terminate时传入的self._terminate_pool对象。
    使用map/map_async函数向进程池中批量分配任务时,使用了生成器表达式:



self._taskqueue.put((((result._job, i, mapstar, (x,), {}) for i, x in enumerate(task_batches)), None))
生成器表达式很简单,只需把列表解析的的[]换成()即可,上述表达的列表解析表示为:
[(result._job, i, mapstar, (x,), {}) for i, x in enumerate(task_batches)]
这里使用生成器表达式的好处是,它相当于列表解析的扩展,是对内存有好的,因为它只是生成了一个生成器,当我们需要使用该生成器对应的逻辑目标数据时,它才会通过既定逻辑去生成该数据,所以不会大量占用内存。
    在Pool中,_worker_handler线程负责监控、创建新的工作进程,在监控工作进程退出时,同时将退出的进程从进程池中删除掉。这类似于,一边遍历一边删除列表。我们来看下下面代码的实现:



>>> l = [1, 2, 3, 3, 4, 4, 4, 5]
>>> for i in l:
if i in [3, 4, 5]:
l.remove(i)

>>> l
[1, 2, 3, 4, 5]
  我们看到l没有将所有的3和4都删除掉,这是因为remove改变了l的大小。再看下面的实现:



>>> l = [1, 2, 3, 3, 4, 4, 4, 5]
>>> for i in range(len(l)):
if l in [3, 4]:
del l

Traceback (most recent call last):
File "<pyshell#37>", line 2, in <module>
if l in [3, 4]:
IndexError: list index out of range
>>>
  同样因为del l时,l的大小改变,继续访问下去导致访问越界。而标准库中的进程池给出了遍历删除的一个正确示例:



for i in reversed(range(len(self._pool))):
worker = self._pool
if worker.exitcode is not None:
worker.join()
cleaned = True
del self._pool
  使用reversed,从后向前删除list中的元素,这样会保证所有符合删除条件的元素被删除掉:



>>> l = [1, 2, 3, 3, 4, 4, 4, 5]
>>> for i in reversed(range(len(l))):
if l in [3, 4, 5]:
del l

>>> l
[1, 2]
    可以看出,一个篇幅并不算大的Pool模块,就有很多值得学习的地方。对于python亦或者其他语言,技能的提升,多阅读标准库中代码,是一个很不错的选择。对于我们经常使用,而不知其中实现奥秘的源码,多阅读源码,了解其技术实现,就像侯捷那本《STL源码剖析》中讲到的,源码之前,了无秘密。更重要的是,将这些漂亮而又高效的编码方式,运用在自己的工作中,让自己的代码也可以像标准库中的代码一样优雅,这可以说是每一个开发人员的追求。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144827-1-1.html 上篇帖子: Python开发入门与实战10-事务 下篇帖子: .Net程序员之Python基础教程学习----判断条件与循环[Fourth Day]
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表