Matthewl 发表于 2015-4-27 10:51:25

python类库32[多进程通信Queue+Pipe+Value+Array]

  

  多进程通信
  queue和pipe的区别: pipe用来在两个进程间通信。queue用来在多个进程间实现通信。 此两种方法为所有系统多进程通信的基本方法,几乎所有的语言都支持此两种方法。
  
  1)Queue & JoinableQueue
  queue用来在进程间传递消息,任何可以pickle-able的对象都可以在加入到queue。
  multiprocessing.JoinableQueue 是 Queue的子类,增加了task_done()和join()方法。


  
  task_done()用来告诉queue一个task完成。一般地在调用get()获得一个task,在task结束后调用task_done()来通知Queue当前task完成。
  join() 阻塞直到queue中的所有的task都被处理(即task_done方法被调用)。
  

  代码:


import multiprocessing
import time
class Consumer(multiprocessing.Process):
   
    def __init__(self, task_queue, result_queue):
      multiprocessing.Process.__init__(self)
      self.task_queue = task_queue
      self.result_queue = result_queue
    def run(self):
      proc_name = self.name
      while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print ('%s: Exiting' % proc_name)
                self.task_queue.task_done()
                break
            print ('%s: %s' % (proc_name, next_task))
            answer = next_task() # __call__()
            self.task_queue.task_done()
            self.result_queue.put(answer)
      return

class Task(object):
    def __init__(self, a, b):
      self.a = a
      self.b = b
    def __call__(self):
      time.sleep(0.1) # pretend to take some time to do the work
      return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
    def __str__(self):
      return '%s * %s' % (self.a, self.b)

if __name__ == '__main__':
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()
   
    # Start consumers
    num_consumers = multiprocessing.cpu_count()
    print ('Creating %d consumers' % num_consumers)
    consumers = [ Consumer(tasks, results)
                  for i in range(num_consumers) ]
    for w in consumers:
      w.start()
   
    # Enqueue jobs
    num_jobs = 10
    for i in range(num_jobs):
      tasks.put(Task(i, i))
   
    # Add a poison pill for each consumer
    for i in range(num_consumers):
      tasks.put(None)
    # Wait for all of the tasks to finish
    tasks.join()
   
    # Start printing results
    while num_jobs:
      result = results.get()
      print ('Result:', result)
      num_jobs -= 1  注意小技巧: 使用None来表示task处理完毕。
  
  运行结果:


  
  2) pipe
  

pipe()返回一对连接对象,代表了pipe的两端。每个对象都有send()和recv()方法。  
  
  代码:
  


from multiprocessing import Process, Pipe
def f(conn):
    conn.send()
    conn.close()
if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    p.join()
    print(parent_conn.recv())   # prints ""  
  
  3)Value + Array
  Value + Array 是python中共享内存 映射文件的方法,速度比较快。
  

from multiprocessing import Process, Value, Array
def f(n, a):
    n.value = n.value + 1
    for i in range(len(a)):
      a = a * 10
if __name__ == '__main__':
    num = Value('i', 1)
    arr = Array('i', range(10))
    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()
    print(num.value)
    print(arr[:])
   
    p2 = Process(target=f, args=(num, arr))
    p2.start()
    p2.join()
    print(num.value)
    print(arr[:])
# the output is :
# 2
#
# 3
#
  

  参考:
The Python Standard Library By Example
  http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html
  
  完!
页: [1]
查看完整版本: python类库32[多进程通信Queue+Pipe+Value+Array]