Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python: execute cat subprocess in parallel

I am running several cat | zgrep commands on a remote server and gathering their output individually for further processing:

class MainProcessor(mp.Process):
    def __init__(self, peaks_array):
        super(MainProcessor, self).__init__()
        self.peaks_array = peaks_array

    def run(self):
        for peak_arr in self.peaks_array:
            peak_processor = PeakProcessor(peak_arr)
            peak_processor.start()

class PeakProcessor(mp.Process):
    def __init__(self, peak_arr):
        super(PeakProcessor, self).__init__()
        self.peak_arr = peak_arr

    def run(self):
        command = 'ssh remote_host cat files_to_process | zgrep --mmap "regex" '
        log_lines = (subprocess.check_output(command, shell=True)).split('\n')
        process_data(log_lines)

This, however, results in sequential execution of the subprocess('ssh ... cat ...') commands. Second peak waits for first to finish and so on.

How can I modify this code so that the subprocess calls run in parallel, while still being able to collect the output for each individually?

like image 805
liarspocker Avatar asked May 12 '14 14:05

liarspocker


1 Answers

You don't need multiprocessing or threading to run subprocesses in parallel. For example:

#!/usr/bin/env python
from subprocess import Popen

# run commands in parallel
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True)
             for i in range(5)]
# collect statuses
exitcodes = [p.wait() for p in processes]

it runs 5 shell commands simultaneously. Note: neither threads nor multiprocessing module are used here. There is no point to add ampersand & to the shell commands: Popen doesn't wait for the command to complete. You need to call .wait() explicitly.

It is convenient but it is not necessary to use threads to collect output from subprocesses:

#!/usr/bin/env python
from multiprocessing.dummy import Pool # thread pool
from subprocess import Popen, PIPE, STDOUT

# run commands in parallel
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True,
                   stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
             for i in range(5)]

# collect output in parallel
def get_lines(process):
    return process.communicate()[0].splitlines()

outputs = Pool(len(processes)).map(get_lines, processes)

Related: Python threading multiple bash subprocesses?.

Here's code example that gets output from several subprocesses concurrently in the same thread:

#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE, STDOUT

@asyncio.coroutine
def get_lines(shell_command):
    p = yield from asyncio.create_subprocess_shell(shell_command,
            stdin=PIPE, stdout=PIPE, stderr=STDOUT)
    return (yield from p.communicate())[0].splitlines()

if sys.platform.startswith('win'):
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

# get commands output in parallel
coros = [get_lines('"{e}" -c "print({i:d}); import time; time.sleep({i:d})"'
                    .format(i=i, e=sys.executable)) for i in range(5)]
print(loop.run_until_complete(asyncio.gather(*coros)))
loop.close()
like image 186
jfs Avatar answered Oct 10 '22 22:10

jfs