Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiprocessing.Queue with hugh data causes _wait_for_tstate_lock

Tags:

An Exception is raised in threading._wait_for_tstate_lock when I transfere hugh data between a Process and a Thread via multiprocessing.Queue.

My minimal working example looks a bit complex first - sorry. I will explain. The original application loads a lot of (not so important) files into RAM. This is done in a separate process to save ressources. The main gui thread shouldn't freez.

  1. The GUI start a separate Thread to prevent the gui event loop from freezing.

  2. This separate Thread then starts one Process which should does the work.

a) This Thread instanciates a multiprocess.Queue (be aware that this is a multiprocessing and not threading!)

b) This is givin to the Process for sharing data from Process back to the Thread.

  1. The Process does some work (3 steps) and .put() the result into the multiprocessing.Queue.

  2. When the Process ends the Thread takes over again and collect the data from the Queue, store it to its own attribute MyThread.result.

  3. The Thread tells the GUI main loop/thread to call a callback function if it has time for.

  4. The callback function (MyWindow::callback_thread_finished()) get the results from MyWindow.thread.result.

The problem is if the data put to the Queue is to big something happen I don't understand - the MyThread never ends. I have to cancle the application via Strg+C.

I got some hints from the docs. But my problem is I did not fully understand the documentation. But I have the feeling that the key of my problems can be found there. Please see the two red boxex in "Pipes and Queues" (Python 3.5 docs). That is the full output

MyWindow::do_start()
Running MyThread...
Running MyProcess...
MyProcess stoppd.
^CProcess MyProcess-1:
Exception ignored in: <module 'threading' from '/usr/lib/python3.5/threading.py'>
Traceback (most recent call last):
  File "/usr/lib/python3.5/threading.py", line 1288, in _shutdown
    t.join()
  File "/usr/lib/python3.5/threading.py", line 1054, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 252, in _bootstrap
    util._exit_function()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 314, in _exit_function
    _run_finalizers()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 254, in _run_finalizers
    finalizer()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 186, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 198, in _finalize_join
    thread.join()
  File "/usr/lib/python3.5/threading.py", line 1054, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

This is the minimal working example

#!/usr/bin/env python3

import multiprocessing
import threading
import time
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from gi.repository import GLib


class MyThread (threading.Thread):
    """This thread just starts the process."""
    def __init__(self, callback):
        threading.Thread.__init__(self)
        self._callback = callback

    def run(self):
        print('Running MyThread...')
        self.result = []

        queue = multiprocessing.Queue()
        process = MyProcess(queue)
        process.start()
        process.join()

        while not queue.empty():
            process_result = queue.get()
            self.result.append(process_result)
        print('MyThread stoppd.')
        GLib.idle_add(self._callback)


class MyProcess (multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        print('Running MyProcess...')
        for i in range(3):
            self.queue.put((i, 'x'*102048))
        print('MyProcess stoppd.')

class MyWindow (Gtk.Window):
    def __init__(self):
        Gtk.Window.__init__(self)
        self.connect('destroy', Gtk.main_quit)
        GLib.timeout_add(2000, self.do_start)

    def do_start(self):
        print('MyWindow::do_start()')
        # The process need to be started from a separate thread
        # to prevent the main thread (which is the gui main loop)
        # from freezing while waiting for the process result.
        self.thread = MyThread(self.callback_thread_finished)
        self.thread.start()

    def callback_thread_finished(self):
        result = self.thread.result
        for r in result:
            print('{} {}...'.format(r[0], r[1][:10]))

if __name__ == '__main__':
    win = MyWindow()
    win.show_all()
    Gtk.main()

Possible duplicate but quite different and IMO without an answer for my situation: Thread._wait_for_tstate_lock() never returns.

Workaround

Using a Manager by modifing line 22 to queue = multiprocessing.Manager().Queue() solve the problem. But I don't know why. My intention of this question is to understand the things behind and not only to make my code work. Even I don't really know what a Manager() is and if it has other (problem causing) implications.

like image 361
buhtz Avatar asked May 27 '19 07:05

buhtz


People also ask

Is multiprocessing queue process safe?

Queues are thread and process safe.

What is queue in multiprocessing?

A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get(). The multiprocessing. Queue provides a first-in, first-out FIFO queue, which means that the items are retrieved from the queue in the order they were added.

How do you stop a multiprocessing process?

Terminating processes in Python We can kill or terminate a process immediately by using the terminate() method. We will use this method to terminate the child process, which has been created with the help of function, immediately before completing its execution.

How do I know if multiprocessing is working in Python?

We can check if a process is alive via the multiprocessing. Process. is_alive() method.


1 Answers

According to the second warning box in the documentation you are linking to you can get a deadlock when you join a process before processing all items in the queue. So starting the process and immediately joining it and then processing the items in the queue is the wrong order of steps. You have to start the process, then receive the items, and then only when all items are received you can call the join method. Define some sentinel value to signal that the process is finished sending data through the queue. None for example if that can't be a regular value you expect from the process.

class MyThread(threading.Thread):
    """This thread just starts the process."""

    def __init__(self, callback):
        threading.Thread.__init__(self)
        self._callback = callback
        self.result = []

    def run(self):
        print('Running MyThread...')
        queue = multiprocessing.Queue()
        process = MyProcess(queue)
        process.start()
        while True:
            process_result = queue.get()
            if process_result is None:
                break
            self.result.append(process_result)
        process.join()
        print('MyThread stoppd.')
        GLib.idle_add(self._callback)


class MyProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        print('Running MyProcess...')
        for i in range(3):
            self.queue.put((i, 'x' * 102048))
        self.queue.put(None)
        print('MyProcess stoppd.')
like image 110
BlackJack Avatar answered Oct 21 '22 13:10

BlackJack