设为首页 收藏本站
查看: 1098|回复: 0

[经验分享] Python:线程、进程与协程(7)——线程池

[复制链接]

尚未签到

发表于 2018-8-4 12:53:12 | 显示全部楼层 |阅读模式
  前面转载了一篇分析进程池源码的博文,是一篇分析进程池很全面的文章,点击此处可以阅读。在Python中还有一个线程池的概念,它也有并发处理能力,在一定程度上能提高系统运行效率;不正之处欢迎批评指正。
  线程的生命周期可以分为5个状态:创建、就绪、运行、阻塞和终止。自线程创建到终止,线程便不断在运行、创建和销毁这3个状态。一个线程的运行时间可由此可以分为3部分:线程的启动时间、线程体的运行时间和线程的销毁时间。在多线程处理的情景中,如果线程不能被重用,就意味着每次创建都需要经过启动、销毁和运行3个过程。这必然会增加系统相应的时间,降低了效率。看看之前介绍线程的博文的例子中(点击此处可以阅读),有多少个任务,就创建多少个线程,但是由于Python特有的GIL限制,它并不是真正意义上的多线程,反而会因为频繁的切换任务等开销而降低了性能(点击此处可以了解Python的GIL)。这种情况下可以使用线程池提高运行效率。
  线程池的基本原理如下图,它是通过将事先创建多个能够执行任务的线程放入池中,所需要执行的任务通常要被安排在队列任务中。一般情况下,需要处理的任务比线程数目要多,线程执行完当前任务后,会从队列中取下一个任务,知道所有的任务完成。
DSC0000.png

  由于线程预先被创建并放入线程池中,同时处理完当前任务之后并不销毁而是被安排处理下一个任务,因此能够避免多次创建线程,从而节省线程创建和销毁的开销,能带来更好的性能和系统稳定性。所以,说白了,Python的线程池也没有利用到多核或者多CPU的优势,只是跟普通的多线程相比,它不用去多次创建线程,节省了线程创建和销毁的时间,从而提高了性能。
  Python中 线程池技术适合处理突发性大量请求或者需要大量线程来完成任务、但每个任务实际处理时间较短的场景,它能有效的避免由于系统创建线程过多而导致性能负荷过大、响应过慢等问题。下面介绍几种利用线程池的方法。
  (一)自定义线程池模式
  我们可以利用Queue模块和threading模块来实现线程池。Queue用来创建任务队列,threading用来创建一个线程池子。
  看下面例子
import Queue,threading  

  
class Worker(threading.Thread):
  
    """
  
    定义一个能够处理任务的线程类,属于自定义线程类,自定义线程类就需要定义run()函数
  
    """
  

  
    def __init__(self,workqueue,resultqueue,**kwargs):
  
        threading.Thread.__init__(self,**kwargs)
  
        self.workqueue = workqueue#存放任务的队列,任务一般都是函数
  
        self.resultqueue = resultqueue#存放结果的队列
  

  
    def run(self):
  
        while True:
  
            try:
  
                #从任务队列中取出一个任务,block设置为False表示如果队列空了,就会抛出异常
  
                callable,args,kwargs = self.workqueue.get(block=False)
  
                res = callable(*args,**kwargs)
  
                self.resultqueue.put(res)#将任务的结果存放到结果队列中
  
            except Queue.Empty:#抛出空队列异常
  
                break
  

  
class WorkerManger(object):
  
    """
  
    定义一个线程池的类
  
    """
  
    def __init__(self,num=10):#默认这个池子里有10个线程
  
        self.workqueue = Queue.Queue()#任务队列,
  
        self.resultqueue = Queue.Queue()#存放任务结果的队列
  
        self.workers = []#所有的线程都存放在这个列表中
  
        self._recruitthreads(num)#创建一系列线程的函数
  
    def _recruitthreads(self,num):
  
        """
  
        创建线程
  
        """
  
        for i in xrange(num):
  
            worker = Worker(self.workqueue,self.resultqueue)
  
            self.workers.append(worker)
  

  
    def start(self):
  
        """
  
        启动线程池中每个线程
  
        """
  
        for work in self.workers:
  
            work.start()
  

  
    def wait_for_complete(self):
  
        """
  
        等待至任务队列中所有任务完成
  
        """
  
        while len(self.workers):
  
            worker = self.workers.pop()
  
            worker.join()
  
            if worker.isAlive() and not self.workqueue.empty():
  
                self.workers.append(worker)
  

  
    def add_job(self,callable,*args,**kwargs):
  
        """
  
        往任务队列中添加任务
  
        """
  
        self.workqueue.put((callable,args,kwargs))
  

  

  
    def get_result(self,*args,**kwargs):
  
        """
  
        获取结果队列
  
        """
  
        return self.resultqueue.get(*args,**kwargs)
  

  
    def add_result(self,result):
  
        self.resultqueue.put(result)
  上面定义了一个线程池,它的初始化函数__init__()定义了一些存放相关数据的属性,这在Python的一些内部模块的类的定义中很常见,所有有时候多看看源码其实挺好的,学习大神的编程习惯和编程思想。
  另外还要提到一点,Queue模块中的队列,不仅可以存放数据(指字符串,数值,列表,字典等等),还可以存放函数的(也就是任务),上面的代码中,callable是一个函数,当用put()将一个函数添加到队列时,put()接受的参数有函数对象以及该函数的相关参数,而且要是一个整体,所以就有了上面代码中的self.workqueue.put((callable,args,kwargs))。同理,当从这种存放函数的队列中取出数据,它返回的就是一个函数对象包括它的相关参数,有兴趣的可以打印出上面代码中run()里的callable,args,kwargs。如果你对Queue模块不了解,可参考我之前的博文,点击此处即可阅读。
  下面就简单的举个小例子吧。
import urllib2,datetime  
def open_url(url):
  
    try:
  
        res = urllib2.urlopen(url).getcode()
  
    except urllib2.HTTPError, e:
  
        res = e.code
  
    #print res
  
    res = str(res)
  
    with open('/home/liulonghua/无标题文档','wr') as f:
  
        f.write(res)
  
    return res
  
if __name__ == "__main__":
  
    urls = [
  
        'http://www.python.org',
  
        'http://www.python.org/about/',
  
        'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  
        'http://www.python.org/doc/',
  
        'http://www.python.org/download/',
  
        'http://www.python.org/getit/',
  
        'http://www.python.org/community/',
  
        'https://wiki.python.org/moin/',
  
        'http://planet.python.org/',
  
        'https://wiki.python.org/moin/LocalUserGroups',
  
        'http://www.python.org/psf/',
  
        'http://docs.python.org/devguide/',
  
        'http://www.python.org/community/awards/'
  
    ]
  
    t1 = datetime.datetime.now()
  
    w = WorkerManger(2)
  
    for url in urls:
  
        w.add_job(open_url,url)
  
    w.start()
  
    w.wait_for_complete()
  
    t2 = datetime.datetime.now()
  
    print t2 - t1
  最后结果如下:
DSC0001.png

  如果把上面代码改成用多线程而不是用线程池,会是怎样的呢?
  代码如下:
if __name__ == "__main__":  
    urls = [
  
        'http://www.python.org',
  
        'http://www.python.org/about/',
  
        'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  
        'http://www.python.org/doc/',
  
        'http://www.python.org/download/',
  
        'http://www.python.org/getit/',
  
        'http://www.python.org/community/',
  
        'https://wiki.python.org/moin/',
  
        'http://planet.python.org/',
  
        'https://wiki.python.org/moin/LocalUserGroups',
  
        'http://www.python.org/psf/',
  
        'http://docs.python.org/devguide/',
  
        'http://www.python.org/community/awards/'
  
    ]
  
    t1 = datetime.datetime.now()
  
    for url in urls:
  
        t = threading.Thread(target=open_url,args=(url,))
  
        t.start()
  
        t.join()
  
    t2 = datetime.datetime.now()
  
    print t2-t1
  运行结果如下:
DSC0002.png

  运行效率的差异还是很大的,有兴趣的可以动手试试。
  (二)使用现成的线程池模块
  下载安装也很简单,用pip工具
sudo pip install threadpool  注意:这里要提到一点,我就陷入这个坑,还好没有花多长时间就解决了。由于我的电脑里有python2.7.12,python3.5,还有一个PyPy5.4.1,上面的指令竟然将threadpool包安装到了PyPy目录下了,所以在python2.7.12里,我import threadpool,它一直报错,如果你的系统里有多个Python版本,又没有用virtualenvs虚拟环境工具,很容易造成这种混乱,虽然我安装了virtualenvs,但在自己的电脑上很少用,这里的解决方法是:
sudo python -m pip install threadpool  以区分PyPy,同理如果是在PyPy环境下安装第三方包的话,用sudo pypy -m pip install packagename,这个在之前的博文中也有介绍,感兴趣的可以点此
  该模块主要的类和方法:
  1.threadpool.ThreadPool:线程池类,主要是用来分派任务请求和收集运行结果。主要方法有:
  (1)__init__(self,number_workers,q_size,resq_size=0,poll_timeout=5):
  建立线程池,并启动对应的num_workers的线程;q_size表示任务请求队列的大小,resq_size表示存放运行结果队列的大小。
  (2)createWorkers(self,num_workers,poll_timeout=5):
  将num_workers数量对应的线程加入线程池
  (3)dismissWorkers(self,num_workers,do_join=False):
  告诉num_workers数量的工作线程在执行完当前任务后退出
  (4)joinAllDismissWorkers(self):
  在设置为退出的线程上执行Thread.join
  (5)putRequest(self,request,block=True,timeout=None):
  加入一个任务请求到工作队列
  (6)pool(self,block=False)
  处理任务队列中新请求。也就是循环的调用各个线程结果中的回调和错误回调。不过,当请求队列为空时会抛出 NoResultPending 异常,以表示所有的结果都处理完了。这个特点对于依赖线程执行结果继续加入请求队列的方式不太适合。
  (7)wait(self)
  等待执行结果,直到所有任务完成。当所有执行结果返回后,线程池内部的线程并没有销毁,而是在等待新任务。因此,wait()之后依然可以在此调用pool.putRequest()往其中添加任务。
  2. threadpool.WorkerThread:处理任务的工作线程,主要有run()方法和dismiss()方法。
  3.threadpool.WorkRequest:任务请求类,包含有具体执行方法的工作请求类
  __init__(self,callable,args=None,kwds=None,requestID=None,callback=None,exc_callback=None)
  创建一个工作请求。
  4.makeRequests(callable_,args_list,callback=None,exc_callback=_handle_thread_exception):
  主要函数,用来创建具有相同的执行函数但参数不同的一系列工作请求。
  有了上面自定义线程池模式的基础,这个模块不难理解,有兴趣的可以去看看该模块的源码。它的使用步骤一般如下:
  (1)引入threadpool模块
  (2)定义线程函数
  (3)创建线程 池threadpool.ThreadPool()
  (4)创建需要线程池处理的任务即threadpool.makeRequests()
  (5)将创建的多个任务put到线程池中,threadpool.putRequest
  (6)等到所有任务处理完毕theadpool.pool()
  将上面的例子用线程池模块进行修改,代码如下:
import threadpool  
if __name__ == "__main__":
  
    urls = [
  
        'http://www.python.org',
  
        'http://www.python.org/about/',
  
        'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  
        'http://www.python.org/doc/',
  
        'http://www.python.org/download/',
  
        'http://www.python.org/getit/',
  
        'http://www.python.org/community/',
  
        'https://wiki.python.org/moin/',
  
        'http://planet.python.org/',
  
        'https://wiki.python.org/moin/LocalUserGroups',
  
        'http://www.python.org/psf/',
  
        'http://docs.python.org/devguide/',
  
        'http://www.python.org/community/awards/'
  
    ]
  
    t1 = datetime.datetime.now()
  
    pool = threadpool.ThreadPool(2)
  

  
    requests = threadpool.makeRequests(open_url,urls)
  
    [pool.putRequest(req) for req in requests]
  
    pool.wait()
  
    t2 = datetime.datetime.now()
  
    print t2-t1
  执行结果如下:
DSC0003.png

  该模块的其它方法,感兴趣的可以自己动手体会下。
  (3)multiprocessing.dummy 执行多线程任务
  multiprocessing.dummy 模块与 multiprocessing 模块的区别: dummy 模块是多线程,而 multiprocessing 是多进程, api 都是通用的。
  Python3里的multiprocessing里也有现成的线程池,如下
from multiprocessing.pool import ThreadPool  有时候看到有人这么用dummy,from multiprocessing.dummy import Pool as ThreadPool ,把它当作了一个线程池。它的属性和方法可以参考进程池。将上面的例子可以用这种方法改下代码如下:
from multiprocessing.dummy import Pool as ThreadPool  
if __name__ == "__main__":
  
    urls = [
  
        'http://www.python.org',
  
        'http://www.python.org/about/',
  
        'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  
        'http://www.python.org/doc/',
  
        'http://www.python.org/download/',
  
        'http://www.python.org/getit/',
  
        'http://www.python.org/community/',
  
        'https://wiki.python.org/moin/',
  
        'http://planet.python.org/',
  
        'https://wiki.python.org/moin/LocalUserGroups',
  
        'http://www.python.org/psf/',
  
        'http://docs.python.org/devguide/',
  
        'http://www.python.org/community/awards/'
  
    ]
  
    t1 = datetime.datetime.now()
  
    pool =ThreadPool(2)
  
    pool.map(open_url,urls)
  
    pool.close()
  
    pool.join()
  
    t2 = datetime.datetime.now()
  
    print t2-t1
  运行结果如下:
DSC0004.png

  我觉得上面三种方法的主体思路还是差不多的,还是比较好理解的,希望对你有帮助,不正之处欢迎批评指正!

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-546495-1-1.html 上篇帖子: python学习笔记-利器篇1:IPython 下篇帖子: python 异常学习3---python异常except语句用法与引发异常
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表