Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Keyboard Interrupt with python's multiprocessing

I am having an issues gracefully handling a Keyboard Interrupt with python's multi-processing

(Yes I know that Ctr-C should not guarantee a graceful shutdown -- but lets leave that discussion for a different thread)

Consider the following code, where I am user a multiprocessing.Manager#list() which is a ListProxy which I understood handles multi-process access to a list.

When I Ctr-C out of this -- I get a socket.error: [Errno 2] No such file or directory when trying to access the ListProxy

I would love to have the shared list not to be corrupted upon Ctr-C. Is this possible?!

Note: I want to solve this without using Pools and Queues.

from multiprocessing import Process, Manager
from time import sleep

def f(process_number, shared_array):
    try:
        print "starting thread: ", process_number
        shared_array.append(process_number)
        sleep(3)
        shared_array.append(process_number)
    except KeyboardInterrupt:
        print "Keyboard interrupt in process: ", process_number
    finally:
        print "cleaning up thread", process_number

if __name__ == '__main__':

    processes = []

    manager = Manager()
    shared_array = manager.list()

    for i in xrange(4):
        p = Process(target=f, args=(i, shared_array))
        p.start()
        processes.append(p)

    try:
        for process in processes:
            process.join()
    except KeyboardInterrupt:
        print "Keyboard interrupt in main"

    for item in shared_array:
        # raises "socket.error: [Errno 2] No such file or directory"
        print item

If you run that and then hit Ctr-C, we get the following:

starting thread:  0
starting thread:  1
starting thread:  3
starting thread:  2
^CKeyboard interupt in process:  3
Keyboard interupt in process:  0
cleaning up thread 3
cleaning up thread 0
Keyboard interupt in process:  1
Keyboard interupt in process:  2
cleaning up thread 1
cleaning up thread 2
Keyboard interupt in main
Traceback (most recent call last):
  File "multi.py", line 33, in <module>
    for item in shared_array:
  File "<string>", line 2, in __getitem__
  File "/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 755, in _callmethod
    self._connect()
  File "/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 742, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/connection.py", line 169, in Client
    c = SocketClient(address)
  File "/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/connection.py", line 293, in SocketClient
    s.connect(address)
  File "/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
socket.error: [Errno 2] No such file or directory

(Here is another approach using a multiprocessing.Lock with similar affect ... gist) Similar Questions:

  • Catch Keyboard Interrupt to stop Python multiprocessing worker from working on queue
  • Keyboard Interrupts with python's multiprocessing Pool
  • Shared variable in python's multiprocessing
like image 743
Jonathan Avatar asked Jan 14 '14 02:01

Jonathan


1 Answers

multiprocessing.Manager() fires up a child process which is responsible for handling your shared list proxy.

netstat output while running:

unix 2 [ ACC ] STREAM LISTENING 3921657 8457/python /tmp/pymp-B9dcij/listener-X423Ml

this child process created by multiprocessing.Manager() is catching your SIGINT and exiting causing anything related to it to be dereferenced hence your "no such file" error (I also got several other errors depending on when i decided to send SIGINT).

to solve this you may directly declare a SyncManager object (instead of letting Manager() do it for you). this will require you to use the start() method to actually fire up the child process. the start() method takes an initialization function as its first argument (you can override SIGINT for the manager here).

code below, give this a try:

from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager, SyncManager
from time import sleep
import signal

#handle SIGINT from SyncManager object
def mgr_sig_handler(signal, frame):
    print 'not closing the mgr'

#initilizer for SyncManager
def mgr_init():
    signal.signal(signal.SIGINT, mgr_sig_handler)
    #signal.signal(signal.SIGINT, signal.SIG_IGN) # <- OR do this to just ignore the signal
    print 'initialized mananger'

def f(process_number, shared_array):
    try:
        print "starting thread: ", process_number
        shared_array.append(process_number)
        sleep(3)
        shared_array.append(process_number)
    except KeyboardInterrupt:
        print "Keyboard interrupt in process: ", process_number
    finally:
        print "cleaning up thread", process_number

if __name__ == '__main__':

    processes = []

     #using syncmanager directly instead of letting Manager() do it for me
    manager = SyncManager()
    manager.start(mgr_init)  #fire up the child manager process
    shared_array = manager.list()

    for i in xrange(4):
        p = Process(target=f, args=(i, shared_array))
        p.start()
        processes.append(p)

    try:
        for process in processes:
            process.join()
    except KeyboardInterrupt:
        print "Keyboard interrupt in main"

    for item in shared_array:
        print item
like image 175
KorreyD Avatar answered Nov 09 '22 04:11

KorreyD