Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

multiprocessing.Queue deadlocks after "reader" process death

I've been playing with multiprocessing package and noticed that queue can be deadlocked for reading when:

  1. The "reader" process is using get with timeout > 0:

    self.queue.get(timeout=3)
    
  2. "reader" dies while get is blocking due to timeout.

After that queue is locked forever.

Application demonstrating the problem

I create two child processes "Worker" (putting into queue) and "Receiver" (getting from queue). Also parent process periodically checks if his children are alive and starts new child if needed.

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import procname
import time

class Receiver(multiprocessing.Process):
    ''' Reads from queue with 3 secs timeout '''

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        procname.setprocname('Receiver')
        while True:
            try:
                msg = self.queue.get(timeout=3)
                print '<<< `{}`, queue rlock: {}'.format(
                    msg, self.queue._rlock)
            except multiprocessing.queues.Empty:
                print '<<< EMPTY, Queue rlock: {}'.format(
                    self.queue._rlock)
                pass


class Worker(multiprocessing.Process):
    ''' Puts into queue with 1 sec sleep '''

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        procname.setprocname('Worker')
        while True:
            time.sleep(1)
            print 'Worker: putting msg, Queue size: ~{}'.format(
                self.queue.qsize())
            self.queue.put('msg from Worker')


if __name__ == '__main__':
    queue = multiprocessing.Queue()

    worker = Worker(queue)
    worker.start()

    receiver = Receiver(queue)
    receiver.start()

    while True:
        time.sleep(1)
        if not worker.is_alive():
            print 'Restarting worker'
            worker = Worker(queue)
            worker.start()
        if not receiver.is_alive():
            print 'Restarting receiver'
            receiver = Receiver(queue)
            receiver.start()

How processes tree looks like in ps

bash
 \_ python queuetest.py
     \_ Worker
     \_ Receiver

Console output

$ python queuetest.py
Worker: putting msg, Queue size: ~0
<<< `msg from Worker`, queue rlock: <Lock(owner=None)>
Worker: putting msg, Queue size: ~0
<<< `msg from Worker`, queue rlock: <Lock(owner=None)>
Restarting receiver                        <-- killed Receiver with SIGTERM
Worker: putting msg, Queue size: ~0
Worker: putting msg, Queue size: ~1
Worker: putting msg, Queue size: ~2
<<< EMPTY, Queue rlock: <Lock(owner=SomeOtherProcess)>
Worker: putting msg, Queue size: ~3
Worker: putting msg, Queue size: ~4
Worker: putting msg, Queue size: ~5
<<< EMPTY, Queue rlock: <Lock(owner=SomeOtherProcess)>
Worker: putting msg, Queue size: ~6
Worker: putting msg, Queue size: ~7

Is there any way to bypass this? Using get_nowait combined with sleep seems to be some kind of workaround but it does not read the data "as it comes".

System information

$ uname -sr
Linux 3.11.8-200.fc19.x86_64

$ python -V
Python 2.7.5

In [3]: multiprocessing.__version__
Out[3]: '0.70a1'

"it just works" solution

While writing this question I came up with some silly modification to Receiver class:

class Receiver(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        procname.setprocname('Receiver')
        while True:
            time.sleep(1)
            while True:
                try:
                    msg = self.queue.get_nowait()
                    print '<<< `{}`, queue rlock: {}'.format(
                        msg, self.queue._rlock)
                except multiprocessing.queues.Empty:
                    print '<<< EMPTY, Queue rlock: {}'.format(
                        self.queue._rlock)
                    break

But it doesn't seem very good to me.

like image 574
Michael Avatar asked Nov 01 '22 06:11

Michael


1 Answers

It's probably because *not_empty.release()* from Queue.get() never happends (the proccess has been killed already). Did you try to catch the TERM signal in Receiver and release the Queue mutex before exiting?

like image 111
neutrinus Avatar answered Nov 15 '22 05:11

neutrinus