Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the cleanest way to stop a python multiprocessing worker attached to a queue in an infinite loop?

Tags:

I'm implementing a producer-consumer pattern in python using multiprocessing.Pool and multiprocessing.Queue. Consumers are pre-forked processes that uses gevent to spawn multiple tasks.

Here is a trimmed down version of code:

import gevent
from Queue import Empty as QueueEmpty
from multiprocessing import Process, Queue, Pool
import signal
import time

# Task queue
queue = Queue()

def init_worker ():
    # Ignore signals in worker
    signal.signal( signal.SIGTERM, signal.SIG_IGN )
    signal.signal( signal.SIGINT, signal.SIG_IGN )
    signal.signal( signal.SIGQUIT, signal.SIG_IGN )

# One of the worker task
def worker_task1( ):
    while True:
        try:
            m = queue.get( timeout = 2 )

            # Break out if producer says quit
            if m == 'QUIT':
                print 'TIME TO QUIT'
                break

        except QueueEmpty:
            pass

# Worker
def work( ):
    gevent.joinall([
        gevent.spawn( worker_task1 ),
    ])

pool = Pool( 2, init_worker )
for i in xrange( 2 ):
    pool.apply_async( work )

try:
    while True:
        queue.put( 'Some Task' )
        time.sleep( 2 )

except KeyboardInterrupt as e:
    print 'STOPPING'

    # Signal all workers to quit
    for i in xrange( 2 ):
        queue.put( 'QUIT' )

    pool.join()

Now when I try to quit it, I get following state:

  1. Parent process is waiting for one of the children to join.
  2. One of the children is in defunct state. So finished but parent is waiting for other child to finish.
  3. Other child is showing: futex(0x7f99d9188000, FUTEX_WAIT, 0, NULL ....

So what is the correct way to end such a process cleanly?

like image 258
Vikas Avatar asked May 21 '13 16:05

Vikas


People also ask

How do you stop a multiprocessing process in Python?

Terminating processes in Python We can kill or terminate a process immediately by using the terminate() method. We will use this method to terminate the child process, which has been created with the help of function, immediately before completing its execution.

How do you close a queue in multiprocessing Python?

Close a Python Multiprocessing Queue If you want no process should write into a multiprocessing queue, you can close the queue using the close() method. The close() method, when invoked on a multiprocessing queue in any of the processes, closes the queue. After this, no process can insert an element into the queue.

How do you stop multiprocessing?

A process can be killed by calling the Process. kill() function. The call will only terminate the target process, not child processes. The method is called on the multiprocessing.

How lock in multiprocessing Python?

Python provides a mutual exclusion lock for use with processes via the multiprocessing. Lock class. An instance of the lock can be created and then acquired by processes before accessing a critical section, and released after the critical section. Only one process can have the lock at any time.


1 Answers

I figured out the problem. According to documentation for multiprocessing.Pool.join(), pool needs to be close()ed before it can be join()ed. Adding pool.close() before pool.join() solved the problem.

like image 140
Vikas Avatar answered Oct 02 '22 19:10

Vikas