Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python multiple subprocess with a pool/queue recover output as soon as one finishes and launch next job in queue

I'm currently launching a subprocess and parsing stdout on the go without waiting for it to finish to parse stdout.

for sample in all_samples:
    my_tool_subprocess = subprocess.Popen('mytool {}'.format(sample),shell=True, stdout=subprocess.PIPE)
    line = True
    while line:
        myline = my_tool_subprocess.stdout.readline()
        #here I parse stdout..

In my script I perform this action multiple times, indeed depending on the number of input samples.

Main problem here is that every subprocess is a program/tool that uses 1 CPU for 100% while it's running. And it takes sometime.. maybe 20-40 min per input.

What I would like to achieve, is to set a pool, queue (I'm not sure what's the exact terminology here) of N max subprocess job process running at same time. So I could maximize performance, and not proceed sequentially.

So an execution flow for example a max 4 jobs pool should be:

  • Launch 4 subprocess.
  • When one of jobs finishes, parse stdout and launch next.
  • Do this until all the jobs in queue are finished.

If I can achieve this I really don't know how I could identify which sample subprocess is the one that has finished. At this moment, I don't need to identify them since each subprocess runs sequentially and I parse stdout as subprocess is printing stdout.

This is really important, since I need to identify the output of each subprocess and assign it to it's corresponding input/sample.

like image 548
gmarco Avatar asked Nov 06 '14 08:11

gmarco


1 Answers

ThreadPool could be a good fit for your problem, you set the number of worker threads and add jobs, and the threads will work their way through all the tasks.

from multiprocessing.pool import ThreadPool
import subprocess


def work(sample):
    my_tool_subprocess = subprocess.Popen('mytool {}'.format(sample),shell=True, stdout=subprocess.PIPE)
    line = True
    while line:
        myline = my_tool_subprocess.stdout.readline()
        #here I parse stdout..


num = None  # set to the number of workers you want (it defaults to the cpu count of your machine)
tp = ThreadPool(num)
for sample in all_samples:
    tp.apply_async(work, (sample,))

tp.close()
tp.join()
like image 89
GP89 Avatar answered Sep 29 '22 20:09

GP89