Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python 2.6 multiprocessing.Queue compatible with threads?

I am experimenting with the new multiprocessing module in Python 2.6. I am creating several processes each with its own multiprocessor.JoinableQueue instance. Each process spawns one or more worker threads (subclasses of threading.Thread) which share the JoinableQueue instance (passed in through each Thread's __init__ method). It seems to generally work but occasionally and unpredictably fails with the following error:

  File "C:\Documents and Settings\Brian\Desktop\testscript.py", line 49, in run
    self.queue.task_done()
  File "C:\Python26\lib\multiprocessing\queues.py", line 293, in task_done
    raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times

My Queue get() and task_done() calls are right after each other so they should be equal. Anecdotally this seems to occur only when the work done between the get() and the task_done() is VERY quick. Inserting a small time.sleep(0.01) seems to alleviate the problem.

Any ideas what is going on? Can I use a multiprocessor Queue with threads instead of the more traditional (Queue.Queue)?

Thanks!

-brian

like image 750
user43233 Avatar asked Dec 05 '08 00:12

user43233


2 Answers

I didn't experiment with multi-processing in 2.6 yet, but I played a lot with pyprocessing (as it was called in 2.5).

I can see that you are looking for a number of processes with each spawning a set of threads respectively.

Since you are using the multiprocessing module, I will suggest use multi process and not multi thread approach, you will hit less problems like deadlocks, etc.

Create a queue object. http://pyprocessing.berlios.de/doc/queue-objects.html

For creating a multi process environment use a pool: http://pyprocessing.berlios.de/doc/pool-objects.html which will manage the worker processes for you. You can then apply asynchronous/synchronous to the workers and can also add a callback for each worker if required. But remember call back is a common code block and it should return immediately (as mentioned in documentation)

Some additional info: If required create a manager http://pyprocessing.berlios.de/doc/manager-objects.html to manage the the access to the queue object. You will have to make the queue object shared for this. But the advantage is that, once shared and managed you can access this shared queue all over the network by creating proxy objects. This will enable you to call methods of a centralized shared queue object as (apparently) native methods on any network node.

here is a code example from the documentation

It is possible to run a manager server on one machine and have clients use it from other machines (assuming that the firewalls involved allow it). Running the following commands creates a server for a shared queue which remote clients can use:

>>> from processing.managers import BaseManager, CreatorMethod
>>> import Queue
>>> queue = Queue.Queue()
>>> class QueueManager(BaseManager):
...     get_proxy = CreatorMethod(callable=lambda:queue, typeid='get_proxy')
...
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='none')
>>> m.serve_forever()

One client can access the server as follows:

>>> from processing.managers import BaseManager, CreatorMethod
>>> class QueueManager(BaseManager):
...     get_proxy = CreatorMethod(typeid='get_proxy')
...
>>> m = QueueManager.from_address(address=('foo.bar.org', 50000), authkey='none')
>>> queue = m.get_proxy()
>>> queue.put('hello')

If you insist on safe threaded stuff, PEP371 (multiprocessing) references this http://code.google.com/p/python-safethread/

like image 199
JV. Avatar answered Nov 09 '22 04:11

JV.


You should pass Queue objects as target's arguments.

Example from multiprocessing's documentation:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

 if __name__ == '__main__':
     q = Queue()
     p = Process(target=f, args=(q,))
     p.start()
     print q.get()    # prints "[42, None, 'hello']"
     p.join()

Queues are thread and process safe.

like image 20
jfs Avatar answered Nov 09 '22 02:11

jfs