I have some producer function which rely on I/O heavy blocking calls and some consumer functions which too rely on I/O heavy blocking calls. In order to speed them up, I used the Gevent micro-threading library as glue.
Here's what my paradigm looks like:
import gevent
from gevent.queue import *
import time
import random
q = JoinableQueue()
workers = []
producers = []
def do_work(wid, value):
gevent.sleep(random.randint(0,2))
print 'Task', value, 'done', wid
def worker(wid):
while True:
item = q.get()
try:
print "Got item %s" % item
do_work(wid, item)
finally:
print "No more items"
q.task_done()
def producer():
while True:
item = random.randint(1, 11)
if item == 10:
print "Signal Received"
return
else:
print "Added item %s" % item
q.put(item)
for i in range(4):
workers.append(gevent.spawn(worker, random.randint(1, 100000)))
#This doesnt work.
for j in range(2):
producers.append(gevent.spawn(producer))
#Uncommenting this makes this script work.
#producer()
q.join()
I have four consumer and would like to have two producers. The producers exit when they a signal i.e. 10. The consumers keep feeding off this queue and the whole task finishes when the producers and consumers are over.
However, this doesn't work. If I comment out the for
loop which spawns multiple producers and use only a single producer, the script runs fine.
I can't seem to figure out what I've done wrong.
Any ideas?
Thanks
You don't actually want to quit when the queue has no unfinished work, because conceptually that's not when the application should finish.
You want to quit when the producers have finished, and then when there is no unfinished work.
# Wait for all producers to finish producing
gevent.joinall(producers)
# *Now* we want to make sure there's no unfinished work
q.join()
# We don't care about workers. We weren't paying them anything, anyways
gevent.killall(workers)
# And, we're done.
I think it does q.join()
before anything is put in the queue and exits immediately. Try joining all producers before joining queue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With