|
1、Process类
from multiprocessing import Process
def func(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=func, args=('bob',))
p.start()
p.join()
2、上下文和开始方法
开始方法
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
上下文
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
3、进程间交换对象
Queues
from multiprocessing import Process, Queue
def func(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=func, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()
Pipes
from multiprocessing import Process, Pipe
def func(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=func, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()
4、进程间同步
例如你可以使用一个锁来确保只有一个进程打印到标准输出
from multiprocessing import Process, Lock
def func(lock, i):
lock.acquire()
try:
print('hello world', i)
finally:
lock.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=func, args=(lock, num)).start()
5、进程间共享状态
Shared memory
使用 Value 或 Array,数据可以存储在一个共享内存映射 。例如,下面的代码
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a = -a
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
参数' d '表示双精度浮点数,参数“i”表示一个带符号整数。这些共享对象将进程和线程安全的。
Server process
通过 Manager() ,返回一个管理器对象,用以控制持有Python对象的服务进程,并允许其他进程使用代理来操作它们。
通过 Manager() 返回的管理器对象支持的类型有:list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
from multiprocessing import Process, Manager
def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(10))
p = Process(target=f, args=(d, l))
p.start()
p.join()
print(d)
print(l)
服务进程管理器比使用共享内存对象更加灵活,因为他们可以支持任意的对象类型。同样,一个管理器可以在不同的电脑通过网络共享的过程。然而,他们慢于使用共享内存。
6、使用进程池
from multiprocessing import Pool
from time import sleep
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(10)" asynchronously
res = pool.apply_async(f, [10])
print(res.get(timeout=1)) # prints "100"
# make worker sleep for 10 secs
res = pool.apply_async(sleep, [10])
print(res.get(timeout=1)) # raises multiprocessing.TimeoutError
# exiting the 'with'-block has stopped the pool
|
|