Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python multiprocessing/threading cleanup

I have a python tool, that has basically this kind of setup:

main process (P1) -> spawns a process (P2) that starts a tcp connection
                  -> spawns a thread (T1) that starts a loop to receive 
                     messages that are sent from P2 to P1 via a Queue (Q1)

server process (P2) -> spawns two threads (T2 and T3) that start loops to
                       receive messages that are sent from P1 to P2 via Queues (Q2 and Q3)

The problem I'm having is that when I stop my program (with Ctrl+C), it doesn't quit. The server process is ended, but the main process just hangs there and I have to kill it.

The thread loop functions all look the same:

def _loop(self):
    while self.running:
        res = self.Q1.get()
        if res is None:
            break
        self._handle_msg(res)

All threads are started as daemon:

t = Thread(target=self._loop)
t.setDaemon(True)
t.start()

In my main process, I use atexit, to perform clean-up tasks:

atexit.register(self.on_exit)

Those clean-up tasks are essentially the following:

1) set self.running in P1 to False and sent None to Q1, so that the Thread T1 should finish

self.running = False
self.Q1.put(None)

2) send a message to P2 via Q2 to inform this process that it is ending

self.Q2.put("stop")

3) In P2, react to the "stop" message and do what we did in P1

self.running = False
self.Q2.put(None)
self.Q3.put(None)

That is it and in my understanding, that should make everything shut down nicely, but it doesn't.

The main code of P1 also contains the following endless loop, because otherwise the program would end prematurely:

while running:
    sleep(1)

Maybe that has something to do with the problem, but I cannot see why it should.

So what did I do wrong? Does my setup have major design flaws? Did I forget to shut down something?

EDIT

Ok, I modified my code and managed to make it shut down correctly most of the time. Unfortunately, from now and then, it still got stuck.

I managed to write a small working example of my code. To demonstrate what happens, you need to simple start the script and then use Ctrl + C to stop it. It looks like the issue appears now usually if you press Ctrl + C as soon as possible after starting the tool.

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import signal
import sys
import logging
from multiprocessing import Process, Queue
from threading import Thread
from time import sleep


logger = logging.getLogger("mepy-client")


class SocketClientProtocol(object):

    def __init__(self, q_in, q_out, q_binary):
        self.q_in = q_in
        self.q_out = q_out
        self.q_binary = q_binary
        self.running = True
        t = Thread(target=self._loop)
        #t.setDaemon(True)
        t.start()
        t = Thread(target=self._loop_binary)
        #t.setDaemon(True)
        t.start()

    def _loop(self):
        print "start of loop 2"
        while self.running:
            res = self.q_in.get()
            if res is None:
                break
            self._handle_msg(res)
        print "end of loop 2"

    def _loop_binary(self):
        print "start of loop 3"
        while self.running:
            res = self.q_binary.get()
            if res is None:
                break
            self._handle_binary(res)
        print "end of loop 3"

    def _handle_msg(self, msg):
        msg_type = msg[0]
        if msg_type == "stop2":
            print "STOP RECEIVED"
            self.running = False
            self.q_in.put(None)
            self.q_binary.put(None)

    def _put_msg(self, msg):
        self.q_out.put(msg)

    def _handle_binary(self, data):
        pass

    def handle_element(self):
        self._put_msg(["something"])

def run_twisted(q_in, q_out, q_binary):
    s = SocketClientProtocol(q_in, q_out, q_binary)
    while s.running:
        sleep(2)
        s.handle_element()


class MediatorSender(object):

    def __init__(self):
        self.q_in = None
        self.q_out = None
        self.q_binary = None
        self.p = None
        self.running = False

    def start(self):
        if self.running:
            return
        self.running = True
        self.q_in = Queue()
        self.q_out = Queue()
        self.q_binary = Queue()
        print "!!!!START"
        self.p = Process(target=run_twisted, args=(self.q_in, self.q_out, self.q_binary))
        self.p.start()
        t = Thread(target=self._loop)
        #t.setDaemon(True)
        t.start()

    def stop(self):
        print "!!!!STOP"
        if not self.running:
            return
        print "STOP2"
        self.running = False
        self.q_out.put(None)
        self.q_in.put(["stop2"])
        #self.q_in.put(None)
        #self.q_binary.put(None)

        try:
            if self.p and self.p.is_alive():
                self.p.terminate()
        except:
            pass

    def _loop(self):
        print "start of loop 1"
        while self.running:
            res = self.q_out.get()
            if res is None:
                break
            self._handle_msg(res)
        print "end of loop 1"

    def _handle_msg(self, msg):
        self._put_msg(msg)

    def _put_msg(self, msg):
        self.q_in.put(msg)

    def _put_binary(self, msg):
        self.q_binary.put(msg)

    def send_chunk(self, chunk):
        self._put_binary(chunk)

running = True
def signal_handler(signal, frame):
    global running
    if running:
        running = False
        ms.stop()
    else:
        sys.exit(0)

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler)
    ms = MediatorSender()
    ms.start()
    for i in range(100):
        ms.send_chunk("some chunk of data")
    while running:
        sleep(1)
like image 707
basilikum Avatar asked Oct 20 '22 12:10

basilikum


2 Answers

I think you're corrupting your multiprocessing.Queue by calling p.terminate() on on the child process. The docs have a warning about this:

Warning: If this method is used when the associated process is using a pipe or queue then the pipe or queue is liable to become corrupted and may become unusable by other process. Similarly, if the process has acquired a lock or semaphore etc. then terminating it is liable to cause other processes to deadlock.

In some cases, it looks like p is terminating before your MediatorSender._loop method can consume the sentinel you loaded into it to let it know that it should exit.

Also, you're installing a signal handler that expects to work in the main process only, but the SIGINT is actually received by both the parent and the child processes, which means signal_handler gets called in both processes, could result in ms.stop getting called twice, due to a race condition in the way you handle setting ms.running to False

I would recommend just exploiting that both processes receive the SIGINT, and have both the parent and child handle KeyboardInterrupt directly. That way, each then have each shut themselves down cleanly, rather than have the parent terminate the child. The following code demonstrates that, and in my testing never hung. I've simplified your code in a few places, but functionally it's exactly the same:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import logging
from multiprocessing import Process, Queue
from threading import Thread
from time import sleep

logger = logging.getLogger("mepy-client")

class SocketClientProtocol(object):

    def __init__(self, q_in, q_out, q_binary):
        self.q_in = q_in
        self.q_out = q_out
        self.q_binary = q_binary
        t = Thread(target=self._loop)
        t.start()
        t = Thread(target=self._loop_binary)
        t.start()

    def _loop(self):
        print("start of loop 2")
        for res in iter(self.q_in.get, None):
            self._handle_msg(res)
        print("end of loop 2")

    def _loop_binary(self):
        print("start of loop 3")
        for res in iter(self.q_binary.get, None):
            self._handle_binary(res)
        print("end of loop 3")

    def _handle_msg(self, msg):
        msg_type = msg[0]
        if msg_type == "stop2":
            self.q_in.put(None)
            self.q_binary.put(None)

    def _put_msg(self, msg):
        self.q_out.put(msg)

    def stop(self):
        print("STOP RECEIVED")
        self.q_in.put(None)
        self.q_binary.put(None)

    def _handle_binary(self, data):
        pass

    def handle_element(self):
        self._put_msg(["something"])

def run_twisted(q_in, q_out, q_binary):
    s = SocketClientProtocol(q_in, q_out, q_binary)
    try:
        while True:
            sleep(2)
            s.handle_element()
    except KeyboardInterrupt:
        s.stop()

class MediatorSender(object):

    def __init__(self):
        self.q_in = None
        self.q_out = None
        self.q_binary = None
        self.p = None
        self.running = False

    def start(self):
        if self.running:
            return
        self.running = True
        self.q_in = Queue()
        self.q_out = Queue()
        self.q_binary = Queue()
        print("!!!!START")
        self.p = Process(target=run_twisted, 
                         args=(self.q_in, self.q_out, self.q_binary))
        self.p.start()
        self.loop = Thread(target=self._loop)
        self.loop.start()

    def stop(self):
        print("!!!!STOP")
        if not self.running:
            return
        print("STOP2")
        self.running = False
        self.q_out.put(None)

    def _loop(self):
        print("start of loop 1")
        for res in iter(self.q_out.get, None):
            self._handle_msg(res)
        print("end of loop 1")

    def _handle_msg(self, msg):
        self._put_msg(msg)

    def _put_msg(self, msg):
        self.q_in.put(msg)

    def _put_binary(self, msg):
        self.q_binary.put(msg)

    def send_chunk(self, chunk):
        self._put_binary(chunk)

if __name__ == "__main__":
    ms = MediatorSender()
    try:
        ms.start()
        for i in range(100):
            ms.send_chunk("some chunk of data")
        # You actually have to join w/ a timeout in a loop on 
        # Python 2.7. If you just call join(), SIGINT won't be 
        # received by the main process, and the program will 
        # hang. This is a bug, and is fixed in Python 3.x.
        while True:
            ms.loop.join()  
    except KeyboardInterrupt:
        ms.stop()

Edit:

If you prefer to use a signal handler rather than catching KeyboardInterrupt, you just need to make sure the child process uses its own signal handler, rather than inheriting the parent's:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import signal
import logging
from functools import partial
from multiprocessing import Process, Queue
from threading import Thread
from time import sleep

logger = logging.getLogger("mepy-client")

class SocketClientProtocol(object):

    def __init__(self, q_in, q_out, q_binary):
        self.q_in = q_in
        self.q_out = q_out
        self.q_binary = q_binary
        self.running = True
        t = Thread(target=self._loop)
        t.start()
        t = Thread(target=self._loop_binary)
        t.start()

    def _loop(self):
        print("start of loop 2")
        for res in iter(self.q_in.get, None):
            self._handle_msg(res)
        print("end of loop 2")

    def _loop_binary(self):
        print("start of loop 3")
        for res in iter(self.q_binary.get, None):
            self._handle_binary(res)
        print("end of loop 3")

    def _handle_msg(self, msg):
        msg_type = msg[0]
        if msg_type == "stop2":
            self.q_in.put(None)
            self.q_binary.put(None)

    def _put_msg(self, msg):
        self.q_out.put(msg)

    def stop(self):
        print("STOP RECEIVED")
        self.running = False
        self.q_in.put(None)
        self.q_binary.put(None)

    def _handle_binary(self, data):
        pass

    def handle_element(self):
        self._put_msg(["something"])

def run_twisted(q_in, q_out, q_binary):
    s = SocketClientProtocol(q_in, q_out, q_binary)
    signal.signal(signal.SIGINT, partial(signal_handler_child, s))
    while s.running:
        sleep(2)
        s.handle_element()

class MediatorSender(object):

    def __init__(self):
        self.q_in = None
        self.q_out = None
        self.q_binary = None
        self.p = None
        self.running = False

    def start(self):
        if self.running:
            return
        self.running = True
        self.q_in = Queue()
        self.q_out = Queue()
        self.q_binary = Queue()
        print("!!!!START")
        self.p = Process(target=run_twisted, 
                         args=(self.q_in, self.q_out, self.q_binary))
        self.p.start()
        self.loop = Thread(target=self._loop)
        self.loop.start()

    def stop(self):
        print("!!!!STOP")
        if not self.running:
            return
        print("STOP2")
        self.running = False
        self.q_out.put(None)

    def _loop(self):
        print("start of loop 1")
        for res in iter(self.q_out.get, None):
            self._handle_msg(res)
        print("end of loop 1")

    def _handle_msg(self, msg):
        self._put_msg(msg)

    def _put_msg(self, msg):
        self.q_in.put(msg)

    def _put_binary(self, msg):
        self.q_binary.put(msg)

    def send_chunk(self, chunk):
        self._put_binary(chunk)

def signal_handler_main(ms, *args):
    ms.stop()

def signal_handler_child(s, *args):
    s.stop()

if __name__ == "__main__":
    ms = MediatorSender()
    signal.signal(signal.SIGINT, partial(signal_handler_main, ms))
    ms.start()
    for i in range(100):
        ms.send_chunk("some chunk of data")
    while ms.loop.is_alive():
        ms.loop.join(9999999)  
    print('done main')
like image 169
dano Avatar answered Oct 22 '22 01:10

dano


Maybe you should try to capture SIGINT signal, which is generated by Ctrl + C using signal.signal like this:

#!/usr/bin/env python
import signal
import sys
def signal_handler(signal, frame):
        print('You pressed Ctrl+C!')
        sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
print('Press Ctrl+C')
signal.pause()

Code stolen from here

like image 42
Tuan Anh Hoang-Vu Avatar answered Oct 22 '22 02:10

Tuan Anh Hoang-Vu