Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Control the number of subprocesses using to call external commands in python

I understand using subprocess is the preferred way of calling external command.

But what if I want to run several commands in parall, but limit the number of processes being spawned? What bothers me is that I can't block the subprocesses. For example, if I call

subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile)

Then the process will continue, without waiting for cmd to finish. Therefore, I can't wrap it up in a worker of multiprocessing library.

For example, if I do:

def worker(cmd): 
    subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile);

pool = Pool( processes = 10 );
results =[pool.apply_async(worker, [cmd]) for cmd in cmd_list];
ans = [res.get() for res in results];

then each worker will finish and return after spawning a subprocess. So I can't really limit the number of processes generated by subprocess by using Pool.

What's the proper way of limiting the number of subprocesses?

like image 784
CuriousMind Avatar asked Mar 21 '12 16:03

CuriousMind


People also ask

What are Subprocesses in Python?

Subprocess in Python is a module used to run new codes and applications by creating new processes. It lets you start new applications right from the Python program you are currently writing. So, if you want to run external programs from a git repository or codes from C or C++ programs, you can use subprocess in Python.

How do I run a subprocess command in Python?

Python Subprocess Run Function The subprocess. run() function was added in Python 3.5 and it is recommended to use the run() function to execute the shell commands in the python program. The args argument in the subprocess. run() function takes the shell command and returns an object of CompletedProcess in Python.

What is the use of subprocess Popen in Python?

The subprocess module defines one class, Popen and a few wrapper functions that use that class. The constructor for Popen takes arguments to set up the new process so the parent can communicate with it via pipes. It provides all of the functionality of the other modules and functions it replaces, and more.


1 Answers

You don't need multiple Python processes or even threads to limit maximum number of parallel subprocesses:

from itertools import izip_longest
from subprocess import Popen, STDOUT

groups = [(Popen(cmd, stdout=outputfile, stderr=STDOUT)
          for cmd in commands)] * limit # itertools' grouper recipe
for processes in izip_longest(*groups): # run len(processes) == limit at a time
    for p in filter(None, processes):
        p.wait()

See Iterate an iterator by chunks (of n) in Python?

If you'd like to limit both maximum and minimum number of parallel subprocesses, you could use a thread pool:

from multiprocessing.pool import ThreadPool
from subprocess import STDOUT, call

def run(cmd):
    return cmd, call(cmd, stdout=outputfile, stderr=STDOUT)

for cmd, rc in ThreadPool(limit).imap_unordered(run, commands):
    if rc != 0:
        print('{cmd} failed with exit status: {rc}'.format(**vars()))

As soon as any of limit subprocesses ends, a new subprocess is started to maintain limit number of subprocesses at all times.

Or using ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor # pip install futures
from subprocess import STDOUT, call

with ThreadPoolExecutor(max_workers=limit) as executor:
    for cmd in commands:
        executor.submit(call, cmd, stdout=outputfile, stderr=STDOUT)

Here's a simple thread pool implementation:

import subprocess
from threading import Thread

try: from queue import Queue
except ImportError:
    from Queue import Queue # Python 2.x


def worker(queue):
    for cmd in iter(queue.get, None):
        subprocess.check_call(cmd, stdout=outputfile, stderr=subprocess.STDOUT)

q = Queue()
threads = [Thread(target=worker, args=(q,)) for _ in range(limit)]
for t in threads: # start workers
    t.daemon = True
    t.start()

for cmd in commands:  # feed commands to threads
    q.put_nowait(cmd)

for _ in threads: q.put(None) # signal no more commands
for t in threads: t.join()    # wait for completion

To avoid premature exit, add exception handling.

If you want to capture subprocess' output in a string, see Python: execute cat subprocess in parallel.

like image 65
jfs Avatar answered Sep 24 '22 01:09

jfs