设为首页 收藏本站
查看: 755|回复: 0

[经验分享] Thread pool in python

[复制链接]

尚未签到

发表于 2015-4-26 06:15:14 | 显示全部楼层 |阅读模式
  Thread pool seems not to exist in threading package. I found some codes and one of them fits my application.
  (From http://code.activestate.com/recipes/203871-a-generic-programming-thread-pool/)
  
  

DSC0000.gif DSC0001.gif 代码



import threading
from time import sleep
# Ensure booleans exist (not needed for Python 2.2.1 or higher)
try:
    True
except NameError:
    False = 0
    True = not False
class ThreadPool:
    """Flexible thread pool class.  Creates a pool of threads, then
    accepts tasks that will be dispatched to the next available
    thread."""
    def __init__(self, numThreads):
        """Initialize the thread pool with numThreads workers."""
        self.__threads = []
        self.__resizeLock = threading.Condition(threading.Lock())
        self.__taskLock = threading.Condition(threading.Lock())
        self.__tasks = []
        self.__isJoining = False
        self.setThreadCount(numThreads)
    def setThreadCount(self, newNumThreads):
        """ External method to set the current pool size.  Acquires
        the resizing lock, then calls the internal version to do real
        work."""
        # Can't change the thread count if we're shutting down the pool!
        if self.__isJoining:
            return False
        self.__resizeLock.acquire()
        try:
            self.__setThreadCountNolock(newNumThreads)
        finally:
            self.__resizeLock.release()
        return True
    def __setThreadCountNolock(self, newNumThreads):
        """Set the current pool size, spawning or terminating threads
        if necessary.  Internal use only; assumes the resizing lock is
        held."""
        # If we need to grow the pool, do so
        while newNumThreads > len(self.__threads):
            newThread = ThreadPoolThread(self)
            self.__threads.append(newThread)
            newThread.start()
        # If we need to shrink the pool, do so
        while newNumThreads < len(self.__threads):
            self.__threads[0].goAway()
            del self.__threads[0]
    def getThreadCount(self):
        """Return the number of threads in the pool."""
        self.__resizeLock.acquire()
        try:
            return len(self.__threads)
        finally:
            self.__resizeLock.release()
    def queueTask(self, task, args=None, taskCallback=None):
        """Insert a task into the queue.  task must be callable;
        args and taskCallback can be None."""
        if self.__isJoining == True:
            return False
        if not callable(task):
            return False
        self.__taskLock.acquire()
        try:
            self.__tasks.append((task, args, taskCallback))
            return True
        finally:
            self.__taskLock.release()
    def getNextTask(self):
        """ Retrieve the next task from the task queue.  For use
        only by ThreadPoolThread objects contained in the pool."""
        self.__taskLock.acquire()
        try:
            if self.__tasks == []:
                return (None, None, None)
            else:
                return self.__tasks.pop(0)
        finally:
            self.__taskLock.release()
    def joinAll(self, waitForTasks = True, waitForThreads = True):
        """ Clear the task queue and terminate all pooled threads,
        optionally allowing the tasks and threads to finish."""
        # Mark the pool as joining to prevent any more task queueing
        self.__isJoining = True
        # Wait for tasks to finish
        if waitForTasks:
            while self.__tasks != []:
                sleep(.1)
        # Tell all the threads to quit
        self.__resizeLock.acquire()
        try:
            self.__setThreadCountNolock(0)
            self.__isJoining = True
            # Wait until all threads have exited
            if waitForThreads:
                for t in self.__threads:
                    t.join()
                    del t
            # Reset the pool for potential reuse
            self.__isJoining = False
        finally:
            self.__resizeLock.release()

class ThreadPoolThread(threading.Thread):
    """ Pooled thread class. """
    threadSleepTime = 0.1
    def __init__(self, pool):
        """ Initialize the thread and remember the pool. """
        threading.Thread.__init__(self)
        self.__pool = pool
        self.__isDying = False
    def run(self):
        """ Until told to quit, retrieve the next task and execute
        it, calling the callback if any.  """
        while self.__isDying == False:
            cmd, args, callback = self.__pool.getNextTask()
            # If there's nothing to do, just sleep a bit
            if cmd is None:
                sleep(ThreadPoolThread.threadSleepTime)
            elif callback is None:
                cmd(args)
            else:
                callback(cmd(args))
    def goAway(self):
        """ Exit the run loop next time through."""
        self.__isDying = True
# Usage example
if __name__ == "__main__":
    from random import randrange
    # Sample task 1: given a start and end value, shuffle integers,
    # then sort them
   
    def sortTask(data):
        print "SortTask starting for ", data
        numbers = range(data[0], data[1])
        for a in numbers:
            rnd = randrange(0, len(numbers) - 1)
            a, numbers[rnd] = numbers[rnd], a
        print "SortTask sorting for ", data
        numbers.sort()
        print "SortTask done for ", data
        return "Sorter ", data
    # Sample task 2: just sleep for a number of seconds.

    def waitTask(data):
        print "WaitTask starting for ", data
        print "WaitTask sleeping for %d seconds" % data
        sleep(data)
        return "Waiter", data
    # Both tasks use the same callback

    def taskCallback(data):
        print "Callback called for", data
    # Create a pool with three worker threads

    pool = ThreadPool(3)
    # Insert tasks into the queue and let them run
    pool.queueTask(sortTask, (1000, 100000), taskCallback)
    pool.queueTask(waitTask, 5, taskCallback)
    pool.queueTask(sortTask, (200, 200000), taskCallback)
    pool.queueTask(waitTask, 2, taskCallback)
    pool.queueTask(sortTask, (3, 30000), taskCallback)
    pool.queueTask(waitTask, 7, taskCallback)
    # When all tasks are finished, allow the threads to terminate
    pool.joinAll()

  


  
  
  Some problems:
  1, However, it's posted in 2003. Now development on python2.6 needn't to ensure boolean exists. Just delete it.
  2, Some comments said that joinAll() couldn't work. Updates codes as
Carl Kleffner gave:  
  
  

代码



def joinAll(self, waitForTasks = True, waitForThreads = True):
        """ Clear the task queue and terminate all pooled threads,
        optionally allowing the tasks and threads to finish."""
        # Mark the pool as joining to prevent any more task queueing
        self.__isJoining = True
        # Wait for tasks to finish
        if waitForTasks:
            while self.__tasks != []:
                sleep(0.1)
        # Tell all the threads to quit
        self.__resizeLock.acquire()
        try:
            # Wait until all threads have exited
            if waitForThreads:
                for t in self.__threads:
                    t.goAway()
                for t in self.__threads:
                    t.join()
                    # print t,"joined"
                    del t
            self.__setThreadCountNolock(0)
            self.__isJoining = True
            # Reset the pool for potential reuse
            self.__isJoining = False
        finally:
            self.__resizeLock.release()

  


  
  3, Busy wait. All the threads would be started when pool initiallation. And  kept  waiting for  task in queue.
  
  
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-60631-1-1.html 上篇帖子: 给Asus WL500G Deluxe无线路由器日志文件添加IP地址地理位置信息的Python小程序 下篇帖子: python 网络编程学习: 2 SOCKET
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表