设为首页 收藏本站
查看: 852|回复: 0

[经验分享] Python进程学习

[复制链接]

尚未签到

发表于 2018-8-5 07:05:43 | 显示全部楼层 |阅读模式
  线程及进程概念可自行学习
  Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。
  子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。
  常用方法:
  multiprocessing.cpu_count()    计算当前计算机有几个CPU可用
  multiprocessing.active_children()    查看当前还活着的子进程
  p.is_alive()    查看当前进程是否存活
  p.join()    进程的阻塞,如果join中无参数,则等待进程运行完后继续执行主函数,如果join有timeout参数,则超出timeout时间后继续执行主函数,不等待进程返回结果
  p.name()    输出p进程的名字
  p.pid()    输出p进程的pid是多少
  p.start()    开始p进程,与run()方法相同
  Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程:
  例子:
  import os
  print 'Process (%s) start...' % os.getpid()
  pid = os.fork()
  if pid==0:
  print 'I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())
  else:
  print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)
  输出:
  Process (876) start...
  I (876) just created a child process (877).
  I am child process (877) and my parent is 876.
  有了fork调用,一个进程在接到新任务时就可以复制出一个子进程来处理新任务,常见的Apache服务器就是由父进程监听端口,每当有新的http请求时,就fork出子进程来处理新的http请求。
  multiprocessing
  由于Python是跨平台的,自然也应该提供一个跨平台的多进程支持。multiprocessing模块就是跨平台版本的多进程模块。
  multiprocessing模块提供了一个Process类来代表一个进程对象,下面的例子演示了启动一个子进程并等待其结束:
  例子:
  from multiprocessing import Process
  import os
  # 子进程要执行的代码
  def run_proc(name):
  print 'Run child process %s (%s)...' % (name, os.getpid())
  if __name__=='__main__':
  print 'Parent process %s.' % os.getpid()
  p = Process(target=run_proc, args=('test',))
  print 'Process will start.'
  p.start()
  p.join()
  print 'Process end.'
  输出:
  Parent process 928.
  Process will start.
  Run child process test (929)...
  Process end.
  创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。
  join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。
  例子:
  #创建子进程的方法
  import time
  import multiprocessing
  def worker(name,interval):
  print ("{0} start".format(name))
  time.sleep(interval)
  print ("{0} end".format(name))
  if __name__ == "__main__":
  print("main start")
  print (multiprocessing.cpu_count())
  #创建子进程,目标是那个函数,传递的参数都有哪些
  p1 = multiprocessing.Process(target=worker, args=("worker1",2))
  p2 = multiprocessing.Process(target=worker, args=("worker2",3))
  p3 = multiprocessing.Process(target=worker, args=("worker3",4))
  #启动进程
  p1.start()
  p2.start()
  p3.start()
  for i in multiprocessing.active_children():
  print ("The PID of {0} is {1}".format(i.name, i.pid))
  print("main end")
  输出:
  main start
  4
  The PID of Process-1 is 1588
  The PID of Process-3 is 6216
  The PID of Process-2 is 5724
  main end
  worker1 start
  worker2 start
  worker3 start
  worker1 end
  worker2 end
  worker3 end
  Pool
  如果要启动大量的子进程,可以用进程池的方式批量创建子进程:
  例子:
  from multiprocessing import Pool
  import os, time, random
  def long_time_task(name):
  print 'Run task %s (%s)...' % (name, os.getpid())
  start = time.time()
  time.sleep(random.random() * 3)
  end = time.time()
  print 'Task %s runs %0.2f seconds.' % (name, (end - start))
  if __name__=='__main__':
  print 'Parent process %s.' % os.getpid()
  p = Pool()
  for i in range(5):
  p.apply_async(long_time_task, args=(i,))
  print 'Waiting for all subprocesses done...'
  p.close()
  p.join()
  print 'All subprocesses done.'
  输出:
  Parent process 669.
  Waiting for all subprocesses done...
  Run task 0 (671)...
  Run task 1 (672)...
  Run task 2 (673)...
  Run task 3 (674)...
  Task 2 runs 0.14 seconds.
  Run task 4 (673)...
  Task 1 runs 0.27 seconds.
  Task 3 runs 0.86 seconds.
  Task 0 runs 1.41 seconds.
  Task 4 runs 1.91 seconds.
  All subprocesses done.
  代码解读:
  对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。
  请注意输出的结果,task 0,1,2,3是立刻执行的,而task 4要等待前面某个task完成后才执行,这是因为Pool的默认大小在我的电脑上是4,因此,最多同时执行4个进程。这是Pool有意设计的限制,并不是操作系统的限制。如果改成:
  p = Pool(5)
  就可以同时跑5个进程。
  由于Pool的默认大小是CPU的核数,如果你不幸拥有8核CPU,你要提交至少9个子进程才能看到上面的等待效果。
  进程间通信
  Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。
  我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:
  例子:
  from multiprocessing import Process, Queue
  import os, time, random
  # 写数据进程执行的代码:
  def write(q):
  for value in ['A', 'B', 'C']:
  print 'Put %s to queue...' % value
  q.put(value)
  time.sleep(random.random())
  # 读数据进程执行的代码:
  def read(q):
  while True:
  value = q.get(True)
  print 'Get %s from queue.' % value
  if __name__=='__main__':
  # 父进程创建Queue,并传给各个子进程:
  q = Queue()
  pw = Process(target=write, args=(q,))
  pr = Process(target=read, args=(q,))
  # 启动子进程pw,写入:
  pw.start()
  # 启动子进程pr,读取:
  pr.start()
  # 等待pw结束:
  pw.join()
  # pr进程里是死循环,无法等待其结束,只能强行终止:
  pr.terminate()
  输出:
  Put A to queue...
  Get A from queue.
  Put B to queue...
  Get B from queue.
  Put C to queue...
  Get C from queue.
  在Unix/Linux下,multiprocessing模块封装了fork()调用,使我们不需要关注fork()的细节。由于Windows没有fork调用,因此,multiprocessing需要“模拟”出fork的效果,父进程所有Python对象都必须通过pickle序列化再传到子进程去,所有,如果multiprocessing在Windows下调用失败了,要先考虑是不是pickle失败了。
  多进程锁
  例子:
  import multiprocessing
  import time
  def add(number, value, lock):
  #获取锁
  lock.acquire()
  #异常的捕获
  try:
  print ("add{0} number = {1}".format(value, number))
  for i in xrange(1, 6):
  number += value
  time.sleep(1)
  print ("add{0} number = {1}".format(value, number))
  except Exception as e:
  raise e
  finally:
  #释放锁
  lock.release()
  if __name__ == "__main__":
  #锁的实例化
  lock = multiprocessing.Lock()
  number = 0
  #进程包含进程锁,p1和p2进程分别去抢锁,先抢到的先运行
  p1 = multiprocessing.Process(target=add, args=(number, 1, lock))
  p2 = multiprocessing.Process(target=add, args=(number, 3, lock))
  p1.start()
  p2.start()
  print ("main end")
  输出:
  main end
  add3 number = 0
  add3 number = 3
  add3 number = 6
  add3 number = 9
  add3 number = 12
  add3 number = 15
  add1 number = 0
  add1 number = 1
  add1 number = 2
  add1 number = 3
  add1 number = 4
  add1 number = 5
  例子:
  import multiprocessing
  import time
  def add(number, value, lock):
  #使用with lock写法来自动加锁及释放,与acquire和release相同
  with lock:
  print ("add{0} number = {1}".format(value, number))
  for i in xrange(1, 6):
  number += value
  time.sleep(1)
  print ("add{0} number = {1}".format(value, number))
  if __name__ == "__main__":
  #锁的实例化
  lock = multiprocessing.Lock()
  number = 0
  #进程包含进程锁,p1和p2进程分别去抢锁,先抢到的先运行
  p1 = multiprocessing.Process(target=add, args=(number, 1, lock))
  p2 = multiprocessing.Process(target=add, args=(number, 3, lock))
  p1.start()
  p2.start()
  print ("main end")
  输出:
  main end
  add1 number = 0
  add1 number = 1
  add1 number = 2
  add1 number = 3
  add1 number = 4
  add1 number = 5
  add3 number = 0
  add3 number = 3
  add3 number = 6
  add3 number = 9
  add3 number = 12
  add3 number = 15
  共享内存
  import multiprocessing
  import time
  def add(number, add_value):
  try:
  print ("add{0} number = {1}".format(add_value, number.value))
  for i in xrange(1, 6):
  number.value += add_value
  time.sleep(1)
  print ("add{0} number = {1}".format(add_value, number.value))
  except Exception as e :
  raise e
  if __name__ == "__main__":
  #number共享内存的实例化,number.value才可以使用共享内存操作,分别有value和array
  number = multiprocessing.Value('i', 0)
  p1 = multiprocessing.Process(target=add, args=(number, 1))
  p2 = multiprocessing.Process(target=add, args=(number, 3))
  p1.start()
  p2.start()
  print ("main end")
  输出:
  main end
  add1 number = 0
  add3 number = 1
  add1 number = 4
  add3 number = 5
  add1 number = 8
  add3 number = 9
  add1 number = 12
  add3 number = 13
  add1 number = 16
  add3 number = 17
  add1 number = 20
  add3 number = 20
  多进程manager管理
  manager可以接收多种类型的数据,相比较array和value功能更丰富
  例子:
  import multiprocessing
  def worker(d, l):
  l += range(11,16)
  for i in xrange(1,6):
  key = "key {0}".format(i)
  value = "value {0}".format(i)
  d[key] = value
  if __name__ == "__main__":
  #实例化manager
  manager = multiprocessing.Manager()
  #接收字典类型的数据
  d = manager.dict()
  #接收列表类型的数据
  l = manager.list()
  p = multiprocessing.Process(target=worker, args=(d, l))
  p.start()
  p.join()
  print (d)
  print (l)
  输出:
  {'key 1': 'value 1', 'key 2': 'value 2', 'key 3': 'value 3', 'key 4': 'value 4', 'key 5': 'value 5'}
  [11, 12, 13, 14, 15]
  进程池
  与MySQL连接池含义类似,创建连接池后所有进程都从进程池连接,超出进程池数量的进程会排队等待
  例子:
  import multiprocessing
  import time
  def fun1(message):
  print ("start {0}".format(message))
  time.sleep(1)
  print ("end {0}".format(message))
  if __name__ == "__main__":
  # 实例化进程池
  pool = multiprocessing.Pool(2)
  for i in xrange(1,10):
  message = "number is {0}".format(i)
  # apply_async是将进程池跑满,多进程同时操作
  pool.apply_async(func=fun1,args=(message,))
  pool.close()
  # 等待所有进程关闭,在join前需要close
  pool.join()
  输出:
  start number is 1
  start number is 2
  end number is 1
  start number is 3
  end number is 2
  start number is 4
  end number is 4
  start number is 5
  end number is 3
  start number is 6
  end number is 6
  start number is 7
  end number is 5
  start number is 8
  end number is 8
  end number is 7
  start number is 9
  end number is 9
  例子:
  import multiprocessing
  import time
  def fun1(message):
  print ("start {0}".format(message))
  time.sleep(1)
  print ("end {0}".format(message))
  if __name__ == "__main__":
  # 实例化进程池
  pool = multiprocessing.Pool(2)
  for i in xrange(1,10):
  message = "number is {0}".format(i)
  # apply是单进程,只有一个进程在运行
  pool.apply(func=fun1,args=(message,))
  pool.close()
  # 等待所有进程关闭,在join前需要close
  pool.join()
  输出:
  start number is 1
  end number is 1
  start number is 2
  end number is 2
  start number is 3
  end number is 3
  start number is 4
  end number is 4
  start number is 5
  end number is 5
  start number is 6
  end number is 6
  start number is 7
  end number is 7
  start number is 8
  end number is 8
  start number is 9
  end number is 9

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-546693-1-1.html 上篇帖子: python常用模块收录 下篇帖子: Python元组 ()
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表