NOTE:
A. 进程池对象的方法只能被创建进程的池所调用。
B. 在Python Version 2.7中,
参数maxtasksperchild 表示一个工作进程在它被退出并被新的工作进程代替之前可以完成的任务数。
它用来支持向未被使用的资源提交任务。
它的默认值为空,表示工作进程会和池存在一样长。
2). map(func, iterable[, chunksize])
A parallel equivalent of the map() built-in function (it supports only one iterable argument though).
It blocks until the result is ready.
This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks.
The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.
3). map_async(func, iterable[, chunksize[, callback]])
A variant of the map() method which returns a result object.
If callback is specified then it should be a callable which accepts a single argument.
When the result becomes ready callback is applied to it (unless the call failed).
callback should complete immediately since otherwise the thread which handles the results will get blocked.
4). imap(func, iterable[, chunksize])
An equivalent of itertools.imap().
The chunksize argument is the same as the one used by the map() method.
For very long iterables using a large value for chunksize can make the job complete much faster than
using the default value of 1.
Also if chunksize is 1 then the next() method of the iterator returned by the imap() method has an optional
timeout parameter: next(timeout) will raise multiprocessing.TimeoutError
if the result cannot be returned within timeout seconds.
5). imap_unordered(func, iterable[, chunksize])
The same as imap() except that the ordering of the results from the returned iterator should be considered arbitrary.
(Only when there is only one worker process is the order guaranteed to be “correct”.)
三、应用示例
The following example demonstrates the use of a pool: 1. 示例一
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously
print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print it.next() # prints "0"
print it.next() # prints "1"
print it.next(timeout=1) # prints "4" unless your computer is *very* slow
import time
result = pool.apply_async(time.sleep, (10,))
print result.get(timeout=1) # raises TimeoutError
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
for i in xrange(10):
msg = "hello %d" %(i)
pool.apply_async(func, (msg, ))
pool.close()
pool.join()
print "Sub-process(es) done."
3、使用Pool,并需要关注结果
更多的时候,我们不仅需要多进程执行,还需要关注每个进程的执行结果,
代码如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
return "done " + msg
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
result = []
for i in xrange(10):
msg = "hello %d" %(i)
result.append(pool.apply_async(func, (msg, )))
pool.close()
pool.join()
for res in result:
print res.get()
print "Sub-process(es) done."
#!/usr/bin/env python
#coding=utf-8
"""
Author: Squall
Last modified: 2011-10-18 16:50
Filename: pool.py
Description: a simple sample for pool class
"""
from multiprocessing import Pool
from time import sleep
def f(x):
for i in range(10):
print '%s --- %s ' % (i, x)
sleep(1)
def main():
pool = Pool(processes=3) # set the processes max number 3
for i in range(11,20):
result = pool.apply_async(f, (i,))
pool.close()
pool.join()
if result.successful():
print 'successful'
if __name__ == "__main__":
main()
先创建容量为3的进程池,然后将f(i)依次传递给它,运行脚本后利用ps aux | grep pool.py查看进程情况,
会发现最多只会有三个进程执行。pool.apply_async()用来向进程池提交目标请求,
pool.join()是用来等待进程池中的worker进程执行完毕,防止主进程在worker进程结束前结束。
但必pool.join()必须使用在pool.close()或者pool.terminate()之后。
其中close()跟terminate()的区别在于close()会等待池中的worker进程执行结束再关闭pool,而terminate()则是直接关闭。
result.successful()表示整个调用执行的状态,如果还有worker没有执行完,则会抛出AssertionError异常。