Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiple processes reading from the same pipe asynchronously

Tags:

python

linux

How can I feed data from one pipe to three different processes?

nulfp = open(os.devnull, "w")

piper = Popen([
    "come command",
    "some params"
], stdout = PIPE, stderr = nulfp.fileno())

pipe_consumer_1 = Popen([
    "come command",
    "some params"
], stdin = piper.stdout, stderr = nulfp.fileno())

pipe_consumer_2 = Popen([
    "come command",
    "some params"
], stdin = piper.stdout, stderr = nulfp.fileno())

pipe_consumer_3 = Popen([
    "come command",
    "some params"
], stdin = piper.stdout, stderr = nulfp.fileno())

pipe_consumer_1.communicate()
pipe_consumer_2.communicate()
pipe_consumer_3.communicate()
piper.communicate()

If I run the code above, it will produce a corrupted file. Meaning that pipe consumers are probably not reading the full output from the piper.

This one works properly but is much slower:

nulfp = open(os.devnull, "w")

piper_1 = Popen([
    "come command",
    "some params"
], stdout = PIPE, stderr = nulfp.fileno())

piper_2 = Popen([
    "come command",
    "some params"
], stdout = PIPE, stderr = nulfp.fileno())

piper_3 = Popen([
    "come command",
    "some params"
], stdout = PIPE, stderr = nulfp.fileno())

pipe_consumer_1 = Popen([
    "come command",
    "some params"
], stdin = piper_1.stdout, stderr = nulfp.fileno())

pipe_consumer_2 = Popen([
    "come command",
    "some params"
], stdin = piper_2.stdout, stderr = nulfp.fileno())

pipe_consumer_3 = Popen([
    "come command",
    "some params"
], stdin = piper_3.stdout, stderr = nulfp.fileno())

pipe_consumer_1.communicate()
pipe_consumer_2.communicate()
pipe_consumer_3.communicate()
piper_1.communicate()
piper_2.communicate()
piper_3.communicate()

Any suggestions how to make the first code snippet work the same way as the second one? If I get the first approach to work, the process would finish in 1/3 of time.

like image 307
Richard Knop Avatar asked Apr 13 '26 01:04

Richard Knop


2 Answers

This only uses a single byte 'block' but you get the idea.

from subprocess import Popen, PIPE

cat_proc = '/usr/bin/cat'

consumers = (Popen([cat_proc], stdin = PIPE, stdout = open('consumer1', 'w')),
             Popen([cat_proc], stdin = PIPE, stdout = open('consumer2', 'w')),
             Popen([cat_proc], stdin = PIPE, stdout = open('consumer3', 'w'))
)


with open('inputfile', 'r') as infile:
   for byte in infile:
       for consumer in consumers:
           consumer.stdin.write(byte)

When testing, the consumer output files matches the input file.

Edit: Here is reading from a process with 1K blocks.

from subprocess import Popen, PIPE

cat_proc = '/usr/bin/cat'

consumers = (Popen([cat_proc], stdin = PIPE, stdout = open('consumer1', 'w')),
             Popen([cat_proc], stdin = PIPE, stdout = open('consumer2', 'w')),
             Popen([cat_proc], stdin = PIPE, stdout = open('consumer3', 'w'))
)

producer = Popen([cat_proc, 'inputfile'], stdout = PIPE)

while True:
    byte = producer.stdout.read(1024)
    if not byte: break
    for consumer in consumers:
        consumer.stdin.write(byte)
like image 79
tMC Avatar answered Apr 15 '26 17:04

tMC


Data from a pipe can only be read once, and will be deleted from the buffer once it ha been read. This means that the consumer processes all only see random parts of the data, that, when combined, would give the complete stream. Of course this is not very useful for you.

You could have the producer process write to subprocess.PIPE, read from this pipe in chunks to a buffer and write this buffer to all consumer processes. This would mean you have to do all the buffer handling yourself. It's probably easier to use tee to do this job for you – I'll post some example code shortly.

like image 35
Sven Marnach Avatar answered Apr 15 '26 19:04

Sven Marnach



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!