I wish to have a single producer, multiple consumer architecture in Python while performing multi-threaded programming. I wish to have an operation like this :
So I need all the consumers to to get the same data from the producer.
When I used Queue to perform this, I realized that all but the first consumer would be starved with the implementation I have.
One possible solution is to have a unique queue for each of the consumer threads wherein the same data is pushed in multiple queues by the producer. Is there a better way to do this ?
from threading import Thread
import time
import random
from Queue import Queue
my_queue = Queue(0)
def Producer():
global my_queue
my_list = []
for each in range (50):
my_list.append(each)
my_queue.put(my_list)
def Consumer1():
print "Consumer1"
global my_queue
print my_queue.get()
my_queue.task_done()
def Consumer2():
print "Consumer2"
global my_queue
print my_queue.get()
my_queue.task_done()
P = Thread(name = "Producer", target = Producer)
C1 = Thread(name = "Consumer1", target = Consumer1)
C2 = Thread(name = "Consumer2", target = Consumer2)
P.start()
C1.start()
C2.start()
In the example above, the C2 gets blocked indefinitely as C1 consumes the data produced by P1. What I would rather want is for C1 and C2 both to be able to access the SAME data as produced by P1.
Thanks for any code/pointers!
A single-producers and five-consumers example, verified.
from multiprocessing import Process, JoinableQueue
import time
import os
q = JoinableQueue()
def producer():
for item in range(30):
time.sleep(2)
q.put(item)
pid = os.getpid()
print(f'producer {pid} done')
def worker():
while True:
item = q.get()
pid = os.getpid()
print(f'pid {pid} Working on {item}')
print(f'pid {pid} Finished {item}')
q.task_done()
for i in range(5):
p = Process(target=worker, daemon=True).start()
producers = []
# it is easy to extend it to multi producers.
for i in range(1):
p = Process(target=producer)
producers.append(p)
p.start()
# make sure producers done
for p in producers:
p.join()
# block until all workers are done
q.join()
print('All work completed')
Explanation:
Your producer creates only one job to do:
my_queue.put(my_list)
For example, put my_list twice, and both consumers work:
def Producer():
global my_queue
my_list = []
for each in range (50):
my_list.append(each)
my_queue.put(my_list)
my_queue.put(my_list)
So this way you put two jobs to queue with the same list.
However i have to warn you: to modify the same data in different threads without thread synchronization is generally bad idea.
Anyways, approach with one queue would not work for you, since one queue is supposed to be processed with threads with the same algorithm.
So, I advise you to go ahead with unique queue per each consumer, since other solutions are not as trivial.
How about a per-thread queue then?
As part of starting each consumer, you would also create another Queue, and add this to a list of "all thread queues". Then start the producer, passing it the list of all queues, which he can then push data into all of them.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With