色胃康胶囊 发表于 2018-7-30 10:10:13

Ansible源码分析之Python的multiprocessing模块使用

def _parallel_exec(self, hosts):  
      ''' handles mulitprocessing when more than 1 fork is required '''
  

  
      manager = multiprocessing.Manager()
  
      job_queue = manager.Queue()
  
      for host in hosts:
  
            job_queue.put(host)
  
      result_queue = manager.Queue()
  

  
      try:
  
            fileno = sys.stdin.fileno()
  
      except ValueError:
  
            fileno = None
  

  
      workers = []
  
      for i in range(self.forks):
  
            new_stdin = None
  
            if fileno is not None:
  
                try:
  
                  new_stdin = os.fdopen(os.dup(fileno))
  
                except OSError, e:
  
                  # couldn't dupe stdin, most likely because it's
  
                  # not a valid file descriptor, so we just rely on
  
                  # using the one that was passed in
  
                  pass
  
            prc = multiprocessing.Process(target=_executor_hook,
  
                args=(job_queue, result_queue, new_stdin))
  
            prc.start()
  
            workers.append(prc)
  

  
      try:
  
            for worker in workers:
  
                worker.join()
  
      except KeyboardInterrupt:
  
            for worker in workers:
  
                worker.terminate()
  
                worker.join()
  

  
      results = []
  
      try:
  
            while not result_queue.empty():
  
                results.append(result_queue.get(block=False))
  
      except socket.error:
  
            raise errors.AnsibleError("<interrupted>")
  
      return results
  

  
    # *****************************************************
页: [1]
查看完整版本: Ansible源码分析之Python的multiprocessing模块使用