I've been playing with multiprocessing package and noticed that queue can be deadlocked for reading when:
The "reader" process is using get with timeout > 0:
self.queue.get(timeout=3)
"reader" dies while get is blocking due to timeout.
After that queue is locked forever.
I create two child processes "Worker" (putting into queue) and "Receiver" (getting from queue). Also parent process periodically checks if his children are alive and starts new child if needed.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import multiprocessing
import procname
import time
class Receiver(multiprocessing.Process):
''' Reads from queue with 3 secs timeout '''
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
procname.setprocname('Receiver')
while True:
try:
msg = self.queue.get(timeout=3)
print '<<< `{}`, queue rlock: {}'.format(
msg, self.queue._rlock)
except multiprocessing.queues.Empty:
print '<<< EMPTY, Queue rlock: {}'.format(
self.queue._rlock)
pass
class Worker(multiprocessing.Process):
''' Puts into queue with 1 sec sleep '''
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
procname.setprocname('Worker')
while True:
time.sleep(1)
print 'Worker: putting msg, Queue size: ~{}'.format(
self.queue.qsize())
self.queue.put('msg from Worker')
if __name__ == '__main__':
queue = multiprocessing.Queue()
worker = Worker(queue)
worker.start()
receiver = Receiver(queue)
receiver.start()
while True:
time.sleep(1)
if not worker.is_alive():
print 'Restarting worker'
worker = Worker(queue)
worker.start()
if not receiver.is_alive():
print 'Restarting receiver'
receiver = Receiver(queue)
receiver.start()
bash
\_ python queuetest.py
\_ Worker
\_ Receiver
$ python queuetest.py
Worker: putting msg, Queue size: ~0
<<< `msg from Worker`, queue rlock: <Lock(owner=None)>
Worker: putting msg, Queue size: ~0
<<< `msg from Worker`, queue rlock: <Lock(owner=None)>
Restarting receiver <-- killed Receiver with SIGTERM
Worker: putting msg, Queue size: ~0
Worker: putting msg, Queue size: ~1
Worker: putting msg, Queue size: ~2
<<< EMPTY, Queue rlock: <Lock(owner=SomeOtherProcess)>
Worker: putting msg, Queue size: ~3
Worker: putting msg, Queue size: ~4
Worker: putting msg, Queue size: ~5
<<< EMPTY, Queue rlock: <Lock(owner=SomeOtherProcess)>
Worker: putting msg, Queue size: ~6
Worker: putting msg, Queue size: ~7
Is there any way to bypass this? Using get_nowait combined with sleep seems to be some kind of workaround but it does not read the data "as it comes".
$ uname -sr
Linux 3.11.8-200.fc19.x86_64
$ python -V
Python 2.7.5
In [3]: multiprocessing.__version__
Out[3]: '0.70a1'
While writing this question I came up with some silly modification to Receiver class:
class Receiver(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
procname.setprocname('Receiver')
while True:
time.sleep(1)
while True:
try:
msg = self.queue.get_nowait()
print '<<< `{}`, queue rlock: {}'.format(
msg, self.queue._rlock)
except multiprocessing.queues.Empty:
print '<<< EMPTY, Queue rlock: {}'.format(
self.queue._rlock)
break
But it doesn't seem very good to me.
It's probably because *not_empty.release()* from Queue.get() never happends (the proccess has been killed already). Did you try to catch the TERM signal in Receiver and release the Queue mutex before exiting?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With