设为首页 收藏本站
查看: 739|回复: 0

[经验分享] 第十五章 Python多进程与多线程

[复制链接]

尚未签到

发表于 2018-8-4 12:00:34 | 显示全部楼层 |阅读模式
  15.1 multiprocessing
  multiprocessing是多进程模块,多进程提供了任务并发性,能充分利用多核处理器。避免了GIL(全局解释锁)对资源的影响。
  有以下常用类:
  类
  描述
Process(group=None, target=None, name=None, args=(), kwargs={})派生一个进程对象,然后调用start()方法启动  Pool(processes=None, initializer=None, initargs=())
返回一个进程池对象,processes进程池进程数量Pipe(duplex=True)返回两个连接对象由管道连接Queue(maxsize=0)返回队列对象,操作方法跟Queue.Queue一样multiprocessing.dummy这个库是用于实现多线程  Process()类有以下些方法:
run()start()启动进程对象join([timeout])等待子进程终止,才返回结果。可选超时。name进程名字is_alive()返回进程是否存活daemon进程的守护标记,一个布尔值pid返回进程IDexitcode子进程退出状态码terminate()终止进程。在unix上使用SIGTERM信号,在windows上使用TerminateProcess()。  Pool()类有以下些方法:
apply(func, args=(), kwds={})等效内建函数apply()apply_async(func, args=(), kwds={}, callback=None)异步,等效内建函数apply()map(func, iterable, chunksize=None)等效内建函数map()map_async(func, iterable, chunksize=None, callback=None)异步,等效内建函数map()imap(func, iterable, chunksize=1)等效内建函数itertools.imap()imap_unordered(func, iterable, chunksize=1)像imap()方法,但结果顺序是任意的close()关闭进程池terminate()终止工作进程,垃圾收集连接池对象join()等待工作进程退出。必须先调用close()或terminate()  Pool.apply_async()和Pool.map_aysnc()又提供了以下几个方法:
get([timeout])获取结果对象里的结果。如果超时没有,则抛出TimeoutError异常wait([timeout])等待可用的结果或超时ready()返回调用是否已经完成successful()  举例:
  1)简单的例子,用子进程处理函数
from multiprocessing import Process  
import os
  
def worker(name):
  
    print name
  
    print 'parent process id:', os.getppid()
  
    print 'process id:', os.getpid()
  
if __name__ == '__main__':
  
    p = Process(target=worker, args=('function worker.',))
  
    p.start()
  
    p.join()
  
    print p.name
  

  
# python test.py
  
function worker.
  
parent process id: 9079
  
process id: 9080
  
Process-1
  Process实例传入worker函数作为派生进程执行的任务,用start()方法启动这个实例。
  2)加以说明join()方法
from multiprocessing import Process  
import os
  
def worker(n):
  
    print 'hello world', n
  
if __name__ == '__main__':
  
    print 'parent process id:', os.getppid()
  
    for n in range(5):
  
        p = Process(target=worker, args=(n,))
  
        p.start()
  
        p.join()
  
        print 'child process id:', p.pid
  
        print 'child process name:', p.name
  

  
# python test.py
  
parent process id: 9041
  
hello world 0
  
child process id: 9132
  
child process name: Process-1
  
hello world 1
  
child process id: 9133
  
child process name: Process-2
  
hello world 2
  
child process id: 9134
  
child process name: Process-3
  
hello world 3
  
child process id: 9135
  
child process name: Process-4
  
hello world 4
  
child process id: 9136
  
child process name: Process-5
  

  
# 把p.join()注释掉再执行
  
# python test.py
  
parent process id: 9041
  
child process id: 9125
  
child process name: Process-1
  
child process id: 9126
  
child process name: Process-2
  
child process id: 9127
  
child process name: Process-3
  
child process id: 9128
  
child process name: Process-4
  
hello world 0
  
hello world 1
  
hello world 3
  
hello world 2
  
child process id: 9129
  
child process name: Process-5
  
hello world 4
  可以看出,在使用join()方法时,输出的结果都是顺序排列的。相反是乱序的。因此join()方法是堵塞父进程,要等待当前子进程执行完后才会继续执行下一个子进程。否则会一直生成子进程去执行任务。
  在要求输出的情况下使用join()可保证每个结果是完整的。
  3)给子进程命名,方便管理
from multiprocessing import Process  
import os, time
  
def worker1(n):
  
    print 'hello world', n
  
def worker2():
  
    print 'worker2...'
  
if __name__ == '__main__':
  
    print 'parent process id:', os.getppid()
  
    for n in range(3):
  
        p1 = Process(name='worker1', target=worker1, args=(n,))
  
        p1.start()
  
        p1.join()
  
        print 'child process id:', p1.pid
  
        print 'child process name:', p1.name
  
    p2 = Process(name='worker2', target=worker2)
  
    p2.start()
  
    p2.join()
  
    print 'child process id:', p2.pid
  
    print 'child process name:', p2.name
  

  
# python test.py
  
parent process id: 9041
  
hello world 0
  
child process id: 9248
  
child process name: worker1
  
hello world 1
  
child process id: 9249
  
child process name: worker1
  
hello world 2
  
child process id: 9250
  
child process name: worker1
  
worker2...
  
child process id: 9251
  
child process name: worker2
  4)设置守护进程,父进程退出也不影响子进程运行
from multiprocessing import Process  
def worker1(n):
  
    print 'hello world', n
  
def worker2():
  
    print 'worker2...'
  
if __name__ == '__main__':
  
    for n in range(3):
  
        p1 = Process(name='worker1', target=worker1, args=(n,))
  
        p1.daemon = True
  
        p1.start()
  
        p1.join()
  
    p2 = Process(target=worker2)
  
    p2.daemon = False
  
    p2.start()
  
    p2.join()
  5)使用进程池
#!/usr/bin/python  
# -*- coding: utf-8 -*-
  
from multiprocessing import Pool, current_process
  
import os, time, sys
  
def worker(n):
  
    print 'hello world', n
  
    print 'process name:', current_process().name  # 获取当前进程名字
  
    time.sleep(1)    # 休眠用于执行时有时间查看当前执行的进程
  
if __name__ == '__main__':
  
    p = Pool(processes=3)
  
    for i in range(8):
  
        r = p.apply_async(worker, args=(i,))
  
        r.get(timeout=5)  # 获取结果中的数据
  
    p.close()
  

  
# python test.py
  
hello world 0
  
process name: PoolWorker-1
  
hello world 1
  
process name: PoolWorker-2
  
hello world 2
  
process name: PoolWorker-3
  
hello world 3
  
process name: PoolWorker-1
  
hello world 4
  
process name: PoolWorker-2
  
hello world 5
  
process name: PoolWorker-3
  
hello world 6
  
process name: PoolWorker-1
  
hello world 7
  
process name: PoolWorker-2
  进程池生成了3个子进程,通过循环执行8次worker函数,进程池会从子进程1开始去处理任务,当到达最大进程时,会继续从子进程1开始。
  在运行此程序同时,再打开一个终端窗口会看到生成的子进程:
# ps -ef |grep python  
root      40244   9041  4 16:43 pts/3   00:00:00 python test.py
  
root      40245  40244  0 16:43 pts/3    00:00:00 python test.py
  
root      40246  40244  0 16:43 pts/3    00:00:00 python test.py
  
root      40247  40244  0 16:43 pts/3    00:00:00 python test.py
  6)进程池map()方法
  map()方法是将序列中的元素通过函数处理返回新列表。
from multiprocessing import Pool  
def worker(url):
  
    return 'http://%s' % url
  
urls = ['www.baidu.com', 'www.jd.com']
  
p = Pool(processes=2)
  
r = p.map(worker, urls)
  
p.close()
  
print r
  

  
# python test.py
  
['http://www.baidu.com', 'http://www.jd.com']
  7)Queue进程间通信
  multiprocessing支持两种类型进程间通信:Queue和Pipe。
  Queue库已经封装到multiprocessing库中,在第十章 Python常用标准库已经讲解到Queue库使用,有需要请查看以前博文。
  例如:一个子进程向队列写数据,一个子进程读取队列数据
#!/usr/bin/python  
# -*- coding: utf-8 -*-
  
from multiprocessing import Process, Queue
  
# 写数据到队列
  
def write(q):
  
    for n in range(5):
  
        q.put(n)
  
        print 'Put %s to queue.' % n
  
# 从队列读数据
  
def read(q):
  
    while True:
  
        if not q.empty():
  
            value = q.get()
  
            print 'Get %s from queue.' % value
  
        else:
  
            break
  
if __name__ == '__main__':
  
    q = Queue()
  
    pw = Process(target=write, args=(q,))
  
    pr = Process(target=read, args=(q,))
  
    pw.start()
  
    pw.join()
  
    pr.start()
  
    pr.join()
  

  
# python test.py
  
Put 0 to queue.
  
Put 1 to queue.
  
Put 2 to queue.
  
Put 3 to queue.
  
Put 4 to queue.
  
Get 0 from queue.
  
Get 1 from queue.
  
Get 2 from queue.
  
Get 3 from queue.
  
Get 4 from queue.
  8)Pipe进程间通信
from multiprocessing import Process, Pipe  
def f(conn):
  
    conn.send([42, None, 'hello'])
  
    conn.close()
  
if __name__ == '__main__':
  
    parent_conn, child_conn = Pipe()
  
    p = Process(target=f, args=(child_conn,))
  
    p.start()
  
    print parent_conn.recv()
  
    p.join()
  

  
# python test.py
  
[42, None, 'hello']
  Pipe()创建两个连接对象,每个链接对象都有send()和recv()方法,
  9)进程间对象共享
  Manager类返回一个管理对象,它控制服务端进程。提供一些共享方式:Value()、Array()、list()、dict()、Event()等
  创建Manger对象存放资源,其他进程通过访问Manager获取。
from multiprocessing import Process, Manager  
def f(v, a, l, d):
  
    v.value = 100
  
    a[0] = 123
  
    l.append('Hello')
  
    d['a'] = 1
  
mgr = Manager()
  
v = mgr.Value('v', 0)
  
a = mgr.Array('d', range(5))
  
l = mgr.list()
  
d = mgr.dict()
  
p = Process(target=f, args=(v, a, l, d))
  
p.start()
  
p.join()
  
print(v)
  
print(a)
  
print(l)
  
print(d)
  

  
# python test.py
  
Value('v', 100)
  
array('d', [123.0, 1.0, 2.0, 3.0, 4.0])
  
['Hello']
  
{'a': 1}
  10)写一个多进程的例子
  比如:多进程监控URL是否正常
from multiprocessing import Pool, current_process  
import urllib2
  
urls = [
  
    'http://www.baidu.com',
  
    'http://www.jd.com',
  
    'http://www.sina.com',
  
    'http://www.163.com',
  
]
  
def status_code(url):
  
    print 'process name:', current_process().name
  
    try:
  
        req = urllib2.urlopen(url, timeout=5)
  
        return req.getcode()
  
    except urllib2.URLError:
  
        return
  
p = Pool(processes=4)
  
for url in urls:
  
    r = p.apply_async(status_code, args=(url,))
  
    if r.get(timeout=5) == 200:
  
        print "%s OK" %url
  
    else:
  
        print "%s NO" %url
  

  
# python test.py
  
process name: PoolWorker-1
  
http://www.baidu.com OK
  
process name: PoolWorker-2
  
http://www.jd.com OK
  
process name: PoolWorker-3
  
http://www.sina.com OK
  
process name: PoolWorker-4
  
http://www.163.com OK
  
  博客地址:http://lizhenliang.blog.51cto.com
  QQ群:323779636(Shell/Python运维开发群)
  
  15.2 threading
  threading模块类似于multiprocessing多进程模块,使用方法也基本一样。threading库是对thread库进行二次封装,我们主要用到Thread类,用Thread类派生线程对象。
  1)使用Thread类实现多线程
from threading import Thread, current_thread  
def worker(n):
  
    print 'thread name:', current_thread().name
  
    print 'hello world', n
  

  
for n in range(5):
  
    t = Thread(target=worker, args=(n, ))
  
    t.start()
  
    t.join()  # 等待主进程结束
  

  
# python test.py
  
thread name: Thread-1
  
hello world 0
  
thread name: Thread-2
  
hello world 1
  
thread name: Thread-3
  
hello world 2
  
thread name: Thread-4
  
hello world 3
  
thread name: Thread-5
  
hello world 4
  2)还有一种方式继承Thread类实现多线程,子类可以重写__init__和run()方法实现功能逻辑。
#!/usr/bin/python  
# -*- coding: utf-8 -*-
  
from threading import Thread, current_thread
  
class Test(Thread):
  
    # 重写父类构造函数,那么父类构造函数将不会执行
  
    def __init__(self, n):
  
        Thread.__init__(self)
  
        self.n = n
  
    def run(self):
  
        print 'thread name:', current_thread().name
  
        print 'hello world', self.n
  
if __name__ == '__main__':
  
    for n in range(5):
  
        t = Test(n)
  
        t.start()
  
        t.join()
  

  
# python test.py
  
thread name: Thread-1
  
hello world 0
  
thread name: Thread-2
  
hello world 1
  
thread name: Thread-3
  
hello world 2
  
thread name: Thread-4
  
hello world 3
  
thread name: Thread-5
  
hello world 4
  3)Lock
from threading import Thread, Lock, current_thread  
lock = Lock()
  
class Test(Thread):
  
    # 重写父类构造函数,那么父类构造函数将不会执行
  
    def __init__(self, n):
  
        Thread.__init__(self)
  
        self.n = n
  
    def run(self):
  
        lock.acquire()  # 获取锁
  
        print 'thread name:', current_thread().name
  
        print 'hello world', self.n
  
        lock.release()  # 释放锁
  
if __name__ == '__main__':
  
    for n in range(5):
  
        t = Test(n)
  
        t.start()
  
        t.join()
  众所周知,Python多线程有GIL全局锁,意思是把每个线程执行代码时都上了锁,执行完成后会自动释放GIL锁,意味着同一时间只有一个线程在运行代码。由于所有线程共享父进程内存、变量、资源,很容易多个线程对其操作,导致内容混乱。
  当你在写多线程程序的时候如果输出结果是混乱的,这时你应该考虑到在不使用锁的情况下,多个线程运行时可能会修改原有的变量,导致输出不一样。
  由此看来Python多线程是不能利用多核CPU提高处理性能,但在IO密集情况下,还是能提高一定的并发性能。也不必担心,多核CPU情况可以使用多进程实现多核任务。Python多进程是复制父进程资源,互不影响,有各自独立的GIL锁,保证数据不会混乱。能用多进程就用吧!

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-546449-1-1.html 上篇帖子: 为什么学习python及python的安装 下篇帖子: Python 利用socket 实现 ssh 跳转
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表