|
def _parallel_exec(self, hosts):
''' handles mulitprocessing when more than 1 fork is required '''
manager = multiprocessing.Manager()
job_queue = manager.Queue()
for host in hosts:
job_queue.put(host)
result_queue = manager.Queue()
try:
fileno = sys.stdin.fileno()
except ValueError:
fileno = None
workers = []
for i in range(self.forks):
new_stdin = None
if fileno is not None:
try:
new_stdin = os.fdopen(os.dup(fileno))
except OSError, e:
# couldn't dupe stdin, most likely because it's
# not a valid file descriptor, so we just rely on
# using the one that was passed in
pass
prc = multiprocessing.Process(target=_executor_hook,
args=(job_queue, result_queue, new_stdin))
prc.start()
workers.append(prc)
try:
for worker in workers:
worker.join()
except KeyboardInterrupt:
for worker in workers:
worker.terminate()
worker.join()
results = []
try:
while not result_queue.empty():
results.append(result_queue.get(block=False))
except socket.error:
raise errors.AnsibleError("<interrupted>")
return results
# ***************************************************** |
|
|