Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I implement a multi-producer, multi-consumer paradigm in Gevent?

Tags:

python

gevent

I have some producer function which rely on I/O heavy blocking calls and some consumer functions which too rely on I/O heavy blocking calls. In order to speed them up, I used the Gevent micro-threading library as glue.

Here's what my paradigm looks like:

import gevent
from gevent.queue import *
import time
import random

q = JoinableQueue()
workers = []
producers = []

def do_work(wid, value):
    gevent.sleep(random.randint(0,2))
    print 'Task', value, 'done', wid

def worker(wid):
    while True:
        item = q.get()
        try:
            print "Got item %s" % item
            do_work(wid, item)
        finally:
            print "No more items"
            q.task_done()


def producer():
    while True:
        item = random.randint(1, 11)
        if item == 10:
            print "Signal Received"
            return
        else:
            print "Added item %s" % item
            q.put(item)



for i in range(4):
    workers.append(gevent.spawn(worker, random.randint(1, 100000)))

#This doesnt work.
for j in range(2):
    producers.append(gevent.spawn(producer))

#Uncommenting this makes this script work.
#producer()

q.join()

I have four consumer and would like to have two producers. The producers exit when they a signal i.e. 10. The consumers keep feeding off this queue and the whole task finishes when the producers and consumers are over.

However, this doesn't work. If I comment out the for loop which spawns multiple producers and use only a single producer, the script runs fine.

I can't seem to figure out what I've done wrong.

Any ideas?

Thanks

like image 690
Mridang Agarwalla Avatar asked Feb 09 '12 12:02

Mridang Agarwalla


2 Answers

You don't actually want to quit when the queue has no unfinished work, because conceptually that's not when the application should finish.

You want to quit when the producers have finished, and then when there is no unfinished work.

# Wait for all producers to finish producing
gevent.joinall(producers)
# *Now* we want to make sure there's no unfinished work
q.join()
# We don't care about workers. We weren't paying them anything, anyways
gevent.killall(workers)
# And, we're done.
like image 113
Ivo Avatar answered Dec 01 '22 01:12

Ivo


I think it does q.join() before anything is put in the queue and exits immediately. Try joining all producers before joining queue.

like image 26
zch Avatar answered Dec 01 '22 01:12

zch