I am trying to use Sailfish, which takes multiple fastq files as arguments, in a ruffus pipeline. I execute Sailfish using the subprocess module in python, but <()
in the subprocess call does not work even when I set shell=True
.
This is the command I want to execute using python:
sailfish quant [options] -1 <(cat sample1a.fastq sample1b.fastq) -2 <(cat sample2a.fastq sample2b.fastq) -o [output_file]
or (preferably):
sailfish quant [options] -1 <(gunzip sample1a.fastq.gz sample1b.fastq.gz) -2 <(gunzip sample2a.fastq.gz sample2b.fastq.gz) -o [output_file]
A generalization:
someprogram <(someprocess) <(someprocess)
How would I go about doing this in python? Is subprocess the right approach?
To use a pipe with the subprocess module, you have to pass shell=True . In your particular case, however, the simple solution is to call subprocess. check_output(('ps', '-A')) and then str. find on the output.
The popen() function executes the command specified by the string command. It creates a pipe between the calling program and the executed command, and returns a pointer to a stream that can be used to either read from or write to the pipe.
pipe() method in Python is used to create a pipe. A pipe is a method to pass information from one process to another process. It offers only one-way communication and the passed information is held by the system until it is read by the receiving process.
To emulate the bash process substitution:
#!/usr/bin/env python
from subprocess import check_call
check_call('someprogram <(someprocess) <(anotherprocess)',
shell=True, executable='/bin/bash')
In Python, you could use named pipes:
#!/usr/bin/env python
from subprocess import Popen
with named_pipes(n=2) as paths:
someprogram = Popen(['someprogram'] + paths)
processes = []
for path, command in zip(paths, ['someprocess', 'anotherprocess']):
with open(path, 'wb', 0) as pipe:
processes.append(Popen(command, stdout=pipe, close_fds=True))
for p in [someprogram] + processes:
p.wait()
where named_pipes(n)
is:
import os
import shutil
import tempfile
from contextlib import contextmanager
@contextmanager
def named_pipes(n=1):
dirname = tempfile.mkdtemp()
try:
paths = [os.path.join(dirname, 'named_pipe' + str(i)) for i in range(n)]
for path in paths:
os.mkfifo(path)
yield paths
finally:
shutil.rmtree(dirname)
Another and more preferable way (no need to create a named entry on disk) to implement the bash process substitution is to use /dev/fd/N
filenames (if they are available) as suggested by @Dunes. On FreeBSD, fdescfs(5)
(/dev/fd/#
) creates entries for all file descriptors opened by the process. To test availability, run:
$ test -r /dev/fd/3 3</dev/null && echo /dev/fd is available
If it fails; try to symlink /dev/fd
to proc(5)
as it is done on some Linuxes:
$ ln -s /proc/self/fd /dev/fd
Here's /dev/fd
-based implementation of someprogram <(someprocess) <(anotherprocess)
bash command:
#!/usr/bin/env python3
from contextlib import ExitStack
from subprocess import CalledProcessError, Popen, PIPE
def kill(process):
if process.poll() is None: # still running
process.kill()
with ExitStack() as stack: # for proper cleanup
processes = []
for command in [['someprocess'], ['anotherprocess']]: # start child processes
processes.append(stack.enter_context(Popen(command, stdout=PIPE)))
stack.callback(kill, processes[-1]) # kill on someprogram exit
fds = [p.stdout.fileno() for p in processes]
someprogram = stack.enter_context(
Popen(['someprogram'] + ['/dev/fd/%d' % fd for fd in fds], pass_fds=fds))
for p in processes: # close pipes in the parent
p.stdout.close()
# exit stack: wait for processes
if someprogram.returncode != 0: # errors shouldn't go unnoticed
raise CalledProcessError(someprogram.returncode, someprogram.args)
Note: on my Ubuntu machine, the subprocess
code works only in Python 3.4+, despite pass_fds
being available since Python 3.2.
Whilst J.F. Sebastian has provided an answer using named pipes it is possible to do this with anonymous pipes.
import shlex
from subprocess import Popen, PIPE
inputcmd0 = "zcat hello.gz" # gzipped file containing "hello"
inputcmd1 = "zcat world.gz" # gzipped file containing "world"
def get_filename(file_):
return "/dev/fd/{}".format(file_.fileno())
def get_stdout_fds(*processes):
return tuple(p.stdout.fileno() for p in processes)
# setup producer processes
inputproc0 = Popen(shlex.split(inputcmd0), stdout=PIPE)
inputproc1 = Popen(shlex.split(inputcmd1), stdout=PIPE)
# setup consumer process
# pass input processes pipes by "filename" eg. /dev/fd/5
cmd = "cat {file0} {file1}".format(file0=get_filename(inputproc0.stdout),
file1=get_filename(inputproc1.stdout))
print("command is:", cmd)
# pass_fds argument tells Popen to let the child process inherit the pipe's fds
someprogram = Popen(shlex.split(cmd), stdout=PIPE,
pass_fds=get_stdout_fds(inputproc0, inputproc1))
output, error = someprogram.communicate()
for p in [inputproc0, inputproc1, someprogram]:
p.wait()
assert output == b"hello\nworld\n"
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With