I'm implementing a producer-consumer pattern in python using multiprocessing.Pool
and multiprocessing.Queue
. Consumers are pre-forked processes that uses gevent
to spawn multiple tasks.
Here is a trimmed down version of code:
import gevent
from Queue import Empty as QueueEmpty
from multiprocessing import Process, Queue, Pool
import signal
import time
# Task queue
queue = Queue()
def init_worker ():
# Ignore signals in worker
signal.signal( signal.SIGTERM, signal.SIG_IGN )
signal.signal( signal.SIGINT, signal.SIG_IGN )
signal.signal( signal.SIGQUIT, signal.SIG_IGN )
# One of the worker task
def worker_task1( ):
while True:
try:
m = queue.get( timeout = 2 )
# Break out if producer says quit
if m == 'QUIT':
print 'TIME TO QUIT'
break
except QueueEmpty:
pass
# Worker
def work( ):
gevent.joinall([
gevent.spawn( worker_task1 ),
])
pool = Pool( 2, init_worker )
for i in xrange( 2 ):
pool.apply_async( work )
try:
while True:
queue.put( 'Some Task' )
time.sleep( 2 )
except KeyboardInterrupt as e:
print 'STOPPING'
# Signal all workers to quit
for i in xrange( 2 ):
queue.put( 'QUIT' )
pool.join()
Now when I try to quit it, I get following state:
futex(0x7f99d9188000, FUTEX_WAIT, 0, NULL ...
.So what is the correct way to end such a process cleanly?
Terminating processes in Python We can kill or terminate a process immediately by using the terminate() method. We will use this method to terminate the child process, which has been created with the help of function, immediately before completing its execution.
Close a Python Multiprocessing Queue If you want no process should write into a multiprocessing queue, you can close the queue using the close() method. The close() method, when invoked on a multiprocessing queue in any of the processes, closes the queue. After this, no process can insert an element into the queue.
A process can be killed by calling the Process. kill() function. The call will only terminate the target process, not child processes. The method is called on the multiprocessing.
Python provides a mutual exclusion lock for use with processes via the multiprocessing. Lock class. An instance of the lock can be created and then acquired by processes before accessing a critical section, and released after the critical section. Only one process can have the lock at any time.
I figured out the problem. According to documentation for multiprocessing.Pool.join()
, pool
needs to be close()ed
before it can be join()ed
. Adding pool.close()
before pool.join()
solved the problem.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With