Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

multiprocessing GUI schemas to combat the "Not Responding" blocking

What are the best ways to create a multiprocessing/ GUI coding system?

I would like to create a place for the internet community to come and find examples on how to use the multiprocessing module in python.

I have seen several small examples of multiprocessing processes on the internet of simple global functions which are called in a main module, but I have found that this rarely translates easily into anything that anyone actually does with regard to GUIs. I would think that many programs would have the functions which they want to use in a separate process as methods of objects (which may be aggregates of other objects etc.) and perhaps a single GUI element would have an associated object that needs to call this process, etc.

For example, I have a relatively complex program and I am having problems in getting a responsive GUI for it, which I believed to be due to my lack of understanding in multiprocessing and threading with QThread. However, I do know that the example given below will at least pass information between processes in the manner I desire (due to being able to execute print statements) but my GUI is still locking. Does anyone know what may be causing this, and if it is still a probelm with my lack of understanding in mutlithreaded/multiprocessing architectures?

Here is a small pseudo code example of what I am doing:

class Worker:
    ...
    def processing(self, queue):
        # put stuff into queue in a loop

# This thread gets data from Worker
class Worker_thread(QThread):
    def __init__(self):
        ...
        # make process with Worker inside
    def start_processing(self):
        # continuously get data from Worker
        # send data to Tab object with signals/slots

class Tab(QTabWidget):
    # spawn a thread separate from main GUI thread

    # update GUI using slot
    def update_GUI()

And this code is fully compilable example which embodies the overlying sturcture of my program:

from PyQt4 import QtCore, QtGui
import multiprocessing as mp
import numpy as np
import sys
import time

# This object can hold several properties which will be used for the processing
# and will be run in the background, while it updates a thread with all of it's progress
class Worker:
    def __init__(self, some_var):
        self.some_var = some_var
        self.iteration = 0

    def some_complex_processing(self, queue):
        for i in range(0,5000):
            self.iteration += 1
            queue.put(self.iteration)
        queue.put('done with processing')

# This Woker_thread is a thread which will spawn a separate process (Worker).
# This separate is needed in order to separate the data retrieval
# from the main GUI thread, which should only quickly update when needed 
class Worker_thread(QtCore.QThread):
    # signals and slots are used to communicate back to the main GUI thread
    update_signal = QtCore.pyqtSignal(int)
    done_signal = QtCore.pyqtSignal()

    def __init__(self, parent, worker):
        QtCore.QThread.__init__(self, parent)
        self.queue = mp.Queue()
        self.worker = worker
        self.parent = parent
        self.process = mp.Process(target=self.worker.some_complex_processing, args=(self.queue,))

    # When the process button is pressed, this function will start getting data from Worker
    # this data is then retrieved by the queue and pushed through a signal
    # to Tab.update_GUI
    @QtCore.pyqtSlot()
    def start_computation(self):
        self.process.start()
        while(True):
            try:
                message = self.queue.get()
                self.update_signal.emit(message)
            except EOFError:
                pass
            if message == 'done with processing':
                self.done_signal.emit()
                break
            #self.parent.update_GUI(message)
        self.process.join()
        return

# Each tab will start it's own thread, which will spawn a process
class Tab(QtGui.QTabWidget):
    start_comp = QtCore.pyqtSignal()
    def __init__(self, parent, this_worker):
        self.parent = parent
        self.this_worker = this_worker
        QtGui.QTabWidget.__init__(self, parent)

        self.treeWidget = QtGui.QTreeWidget(self)
        self.properties = QtGui.QTreeWidgetItem(self.treeWidget, ["Properties"])
        self.step = QtGui.QTreeWidgetItem(self.properties, ["Iteration #"])

        self.thread = Worker_thread(parent=self, worker=self.this_worker)
        self.thread.update_signal.connect(self.update_GUI)
        self.thread.done_signal.connect(self.thread.quit)
        self.start_comp.connect(self.thread.start_computation)
        self.thread.start()

    ###############################
    # Here is what should update the GUI at every iteration of Worker.some_complex_processing()
    # The message appears to be getting sent, due to seeing the print statement in the console, but the GUI is not updated.
    @QtCore.pyqtSlot(int)
    def update_GUI(self, iteration):
        self.step.setText(0, str(iteration))
        #time.sleep(0.1)
        print iteration

    def start_signal_emit(self):
        self.start_comp.emit()

# GUI stuff
class MainWindow(QtGui.QMainWindow):
    def __init__(self, parent = None):
        QtGui.QMainWindow.__init__(self)
        self.tab_list = []
        self.setTabShape(QtGui.QTabWidget.Rounded)
        self.centralwidget = QtGui.QWidget(self)
        self.top_level_layout = QtGui.QGridLayout(self.centralwidget)

        self.tabWidget = QtGui.QTabWidget(self.centralwidget)
        self.top_level_layout.addWidget(self.tabWidget, 1, 0, 25, 25)

        process_button = QtGui.QPushButton("Process")
        self.top_level_layout.addWidget(process_button, 0, 1)
        QtCore.QObject.connect(process_button, QtCore.SIGNAL("clicked()"), self.process)

        self.setCentralWidget(self.centralwidget)
        self.centralwidget.setLayout(self.top_level_layout)

        # Make Tabs in loop from button
        for i in range(0,10):
            name = 'tab' + str(i)
            self.tab_list.append(Tab(self.tabWidget, Worker(name)))
            self.tabWidget.addTab(self.tab_list[-1], name)

    # Do the processing
    def process(self):
        for tab in self.tab_list:
            tab.start_signal_emit()
        return

if __name__ == "__main__":
    app = QtGui.QApplication([])
    win = MainWindow()
    win.show()
    sys.exit(app.exec_())

More Information: I am writing a program which I would like to spawn several processes from and have them continuously show their progress throughout their processing. I would like the program to be multiprocessed in order to get the best speed out of the program as possible.

At the moment, I am trying to use a thread to spawn a process and use signals and slots to update the GUI while the data is continuously retrieved by a queue. It appears that the queues, signals, and slots work when using print statements, but can not update the GUI. If anyone has any other suggestions as to how I should structure this in order to keep the program more managable, I would like to learn.

EDIT: I have made the adjustments put forth by Min Lin, with the addition of making Worker a QObject so that moveToThread() would work.
Here is the new code I have at the moment:

from PyQt4 import QtCore, QtGui
import multiprocessing as mp
import numpy as np
import sys
import time

class Worker(QtCore.QObject):
    update_signal = QtCore.pyqtSignal(int)
    done_signal = QtCore.pyqtSignal()

    def __init__(self, some_var):
        QtCore.QObject.__init__(self, parent=None)
        self.some_var = some_var
        self.iteration = 0
        self.queue = mp.Queue()
        self.process = mp.Process(target=self.some_complex_processing, args=(self.queue,))

    def some_complex_processing(self, queue):
        for i in range(0,5000):
            self.iteration += 1
            queue.put(self.iteration)
        queue.put('done with processing')

    @QtCore.pyqtSlot()
    def start_computation(self):
        self.process.start()
        while(True):
            try:
                message = self.queue.get()
                self.update_signal.emit(message)
            except EOFError:
                pass
            if message == 'done with processing':
                self.done_signal.emit()
                break
        self.process.join()
        return



class Tab(QtGui.QTabWidget):
    start_comp = QtCore.pyqtSignal()
    def __init__(self, parent, this_worker):
        self.parent = parent
        self.this_worker = this_worker
        QtGui.QTabWidget.__init__(self, parent)

        self.treeWidget = QtGui.QTreeWidget(self)
        self.properties = QtGui.QTreeWidgetItem(self.treeWidget, ["Properties"])
        self.step = QtGui.QTreeWidgetItem(self.properties, ["Iteration #"])

        # Use QThread is enough
        self.thread = QtCore.QThread();
        # Change the thread affinity of worker to self.thread.
        self.this_worker.moveToThread(self.thread);
        self.this_worker.update_signal.connect(self.update_GUI)
        self.this_worker.done_signal.connect(self.thread.quit)
        self.start_comp.connect(self.this_worker.start_computation)
        self.thread.start()

    ###############################
    # Here is what should update the GUI at every iteration of Worker.some_complex_processing()
    # The message appears to be getting sent, due to seeing the print statement in the console, but the GUI is not updated.
    @QtCore.pyqtSlot(int)
    def update_GUI(self, iteration):
        self.step.setText(0, str(iteration))
        #time.sleep(0.1)
        print iteration

    def start_signal_emit(self):
        self.start_comp.emit()

# GUI stuff
class MainWindow(QtGui.QMainWindow):
    def __init__(self, parent = None):
        QtGui.QMainWindow.__init__(self)
        self.tab_list = []
        self.setTabShape(QtGui.QTabWidget.Rounded)
        self.centralwidget = QtGui.QWidget(self)
        self.top_level_layout = QtGui.QGridLayout(self.centralwidget)

        self.tabWidget = QtGui.QTabWidget(self.centralwidget)
        self.top_level_layout.addWidget(self.tabWidget, 1, 0, 25, 25)

        process_button = QtGui.QPushButton("Process")
        self.top_level_layout.addWidget(process_button, 0, 1)
        QtCore.QObject.connect(process_button, QtCore.SIGNAL("clicked()"), self.process)

        self.setCentralWidget(self.centralwidget)
        self.centralwidget.setLayout(self.top_level_layout)

        # Make Tabs in loop from button
        for i in range(0,10):
            name = 'tab' + str(i)
            self.tab_list.append(Tab(self.tabWidget, Worker(name)))
            self.tabWidget.addTab(self.tab_list[-1], name)

    # Do the processing
    def process(self):
        for tab in self.tab_list:
            tab.start_signal_emit()
        return

if __name__ == "__main__":
    app = QtGui.QApplication([])
    win = MainWindow()
    win.show()
    sys.exit(app.exec_())

Thank you for all of the answers, I appreciate the level of detail that everyone has gone into in describing the idea they believe to be solution, but unfortunately I have not yet been able to perform these types of processes which operate on the object they belong to while displaying the object's attribute on a GUI.
However, I have learned a decent amount from this post, which allowed me to realize that the threaded version I have at the moment is hanging the GUI since the GUI update function is too large and takes too much processing.

So, I have taken the QTimer() approach to my multi-threaded version and it is performing much better! I would advise anyone facing similar problems to at least attempt something similar to this.

I was unaware of this approach to solving GUI update problems, and it is now a pseudo or temporary fix to the problem I am facing.

like image 317
chase Avatar asked Mar 29 '13 06:03

chase


3 Answers

A GUI application is perfect for testing stuff, as it is easy to spawn new tasks and visualize what is going on, so I wrote a little example app (Screenshot, code is below) as I did want to learn it for my self.

At first, i took a similar approach as yours, trying to implement the Consumer/Producer pattern and I struggeled with Background Processes doing endless loops to wait for new jobs and took care of communication back and forth for myself. Then I found out about the Pool Interface and then I could replace all that hidious code with just a few lines. All you need is a single pool and a few callbacks:

#!/usr/bin/env python3
import multiprocessing, time, random, sys
from PySide.QtCore import * # equivalent: from PyQt4.QtCore import *
from PySide.QtGui import *   # equivalent: from PyQt4.QtGui import *

def compute(num):
    print("worker() started at %d" % num)
    random_number = random.randint(1, 6)
    if random_number in (2, 4, 6):
        raise Exception('Random Exception in _%d' % num)
    time.sleep(random_number)
    return num

class MainWindow(QMainWindow):
    def __init__(self):
        QMainWindow.__init__(self)
        self.toolBar = self.addToolBar("Toolbar")
        self.toolBar.addAction(QAction('Add Task', self, triggered=self.addTask))
        self.list = QListWidget()
        self.setCentralWidget(self.list)

        # Pool of Background Processes
        self.pool = multiprocessing.Pool(processes=4)

    def addTask(self):
        num_row = self.list.count()
        self.pool.apply_async(func=compute, args=(num_row,), callback=self.receiveResult,
                              error_callback=self.receiveException)
        item = QListWidgetItem("item %d" % num_row)
        item.setForeground(Qt.gray)
        self.list.addItem(item)

    def receiveResult(self, result):
        assert isinstance(result, int)
        print("end_work(), where result is %s" % result)
        self.list.item(result).setForeground(Qt.darkGreen)

    def receiveException(self, exception):
        error = str(exception)
        _pos = error.find('_') + 1
        num_row = int(error[_pos:])
        item = self.list.item(num_row)
        item.setForeground(Qt.darkRed)
        item.setText(item.text() + ' Retry...')
        self.pool.apply_async(func=compute, args=(num_row,), callback=self.receiveResult,
                              error_callback=self.receiveException)

if __name__ == '__main__':
    app = QApplication(sys.argv)
    main_window = MainWindow()
    main_window.show()
    sys.exit(app.exec_())

Edit: I did another example using a QTimer instead of Callbacks, checking periodically for Entries in a Queue, updating a QProgressBar:

#!/usr/bin/env python3
import multiprocessing, multiprocessing.pool, time, random, sys
from PySide.QtCore import *
from PySide.QtGui import *

def compute(num_row):
    print("worker started at %d" % num_row)
    random_number = random.randint(1, 10)
    for second in range(random_number):
        progress = float(second) / float(random_number) * 100
        compute.queue.put((num_row, progress,))
        time.sleep(1)
    compute.queue.put((num_row, 100))

def pool_init(queue):
    # see http://stackoverflow.com/a/3843313/852994
    compute.queue = queue

class MainWindow(QMainWindow):
    def __init__(self):
        QMainWindow.__init__(self)
        self.toolBar = self.addToolBar("Toolbar")
        self.toolBar.addAction(QAction('Add Task', self, triggered=self.addTask))
        self.table = QTableWidget()
        self.table.verticalHeader().hide()
        self.table.setColumnCount(2)
        self.setCentralWidget(self.table)

        # Pool of Background Processes
        self.queue = multiprocessing.Queue()
        self.pool = multiprocessing.Pool(processes=4, initializer=pool_init, initargs=(self.queue,))

        # Check for progress periodically
        self.timer = QTimer()
        self.timer.timeout.connect(self.updateProgress)
        self.timer.start(2000)

    def addTask(self):
        num_row = self.table.rowCount()
        self.pool.apply_async(func=compute, args=(num_row,))
        label = QLabel("Queued")
        bar = QProgressBar()
        bar.setValue(0)
        self.table.setRowCount(num_row + 1)
        self.table.setCellWidget(num_row, 0, label)
        self.table.setCellWidget(num_row, 1, bar)

    def updateProgress(self):
        if self.queue.empty(): return
        num_row, progress = self.queue.get() # unpack
        print("received progress of %s at %s" % (progress, num_row))
        label = self.table.cellWidget(num_row, 0)
        bar = self.table.cellWidget(num_row, 1)
        bar.setValue(progress)
        if progress == 100:
            label.setText('Finished')
        elif label.text() == 'Queued':
            label.setText('Downloading')
        self.updateProgress() # recursion

if __name__ == '__main__':
    app = QApplication(sys.argv)
    main_window = MainWindow()
    main_window.show()
    sys.exit(app.exec_())
like image 94
valmynd Avatar answered Nov 08 '22 00:11

valmynd


Thank you so much for posting this question, and all contributors for their input. It provided a useful scaffolding for me to experiment with PyQt and multiprocessing.

I started with the second code sample listed in the question. My changes and comments:

  • On Windows, all arguments to Process.__init__() must be pickleable. You will see that @valmynd makes his/her compute function a top-level function, for this very reason. This is partly because multiprocessing will reimport the module containing the target function. To remind myself of this, I try to put the target function into its own module (and make sure any information is passed as pickleable arguments). I have moved the complex processing function into its own module called workermodule.py.
  • Without sufficient work in the complex processing function, the loop finishes too quickly to allow any change to be visible in the GUI. Therefore, I added some extra (useless) work inside the complex processing function. As noted in the comment, you could time.sleep, but it is more satisfying to light up all the cores for a bit.
  • With the following two code snippets, I get a smooth GUI with continuous updates to the iteration value, and the sub-processes run at full speed.
  • Note that the self.process could be created with two queues as arguments, one for input and one for output. Then the complex processing function would have to periodically check the input queue for data.

workermodule.py:

import time

def some_complex_processing(queue):
    iteration = 0
    for i in range(0,5000):
        iteration += 1
        queue.put(iteration)
        #time.sleep(20e-3) # ms
        # You could time.sleep here to simulate a
        # long-running process, but just to prove
        # that we're not cheating, let's make this
        # process work hard, while the GUI process
        # should still have smooth drawing.
        for x in range(100000):
            y = x
    queue.put('done with processing')

mainfile.py:

from PyQt4 import QtCore, QtGui
import multiprocessing as mp
import sys
import workermodule

class Worker(QtCore.QObject):
    update_signal = QtCore.pyqtSignal(int)
    done_signal = QtCore.pyqtSignal()

    def __init__(self, some_var):
        QtCore.QObject.__init__(self, parent=None)
        self.some_var = some_var
        self.queue = mp.Queue()
        self.process = mp.Process(
            target=workermodule.some_complex_processing,
            args=(self.queue,)
            )

    @QtCore.pyqtSlot()
    def start_computation(self):
        self.process.start()
        while True:
            try:
                message = self.queue.get()
                self.update_signal.emit(message)
            except EOFError:
                pass
            if message == 'done with processing':
                self.done_signal.emit()
                break
        self.process.join()
        return

class Tab(QtGui.QTabWidget):
    start_comp = QtCore.pyqtSignal()
    def __init__(self, parent, this_worker):
        self.parent = parent
        self.this_worker = this_worker
        QtGui.QTabWidget.__init__(self, parent)

        self.treeWidget = QtGui.QTreeWidget(self)
        self.properties = QtGui.QTreeWidgetItem(self.treeWidget, ["Properties"])
        self.step = QtGui.QTreeWidgetItem(self.properties, ["Iteration #"])

        # Use QThread is enough
        self.thread = QtCore.QThread();
        # Change the thread affinity of worker to self.thread.
        self.this_worker.moveToThread(self.thread);
        self.this_worker.update_signal.connect(self.update_GUI)
        self.this_worker.done_signal.connect(self.thread.quit)
        self.start_comp.connect(self.this_worker.start_computation)
        self.thread.start()

    ###############################
    # Here is what should update the GUI at every iteration of Worker.some_complex_processing()
    # The message appears to be getting sent, due to seeing the print statement in the console, but the GUI is not updated.
    @QtCore.pyqtSlot(int)
    def update_GUI(self, iteration):
        self.step.setText(0, str(iteration))
        print iteration

    def start_signal_emit(self):
        self.start_comp.emit()

# GUI stuff
class MainWindow(QtGui.QMainWindow):
    def __init__(self, parent = None):
        QtGui.QMainWindow.__init__(self)
        self.tab_list = []
        self.setTabShape(QtGui.QTabWidget.Rounded)
        self.centralwidget = QtGui.QWidget(self)
        self.top_level_layout = QtGui.QGridLayout(self.centralwidget)

        self.tabWidget = QtGui.QTabWidget(self.centralwidget)
        self.top_level_layout.addWidget(self.tabWidget, 1, 0, 25, 25)

        process_button = QtGui.QPushButton("Process")
        self.top_level_layout.addWidget(process_button, 0, 1)
        QtCore.QObject.connect(process_button, QtCore.SIGNAL("clicked()"), self.process)

        self.setCentralWidget(self.centralwidget)
        self.centralwidget.setLayout(self.top_level_layout)

        # Make Tabs in loop from button
        for i in range(0,10):
            name = 'tab' + str(i)
            self.tab_list.append(Tab(self.tabWidget, Worker(name)))
            self.tabWidget.addTab(self.tab_list[-1], name)

    # Do the processing
    def process(self):
        for tab in self.tab_list:
            tab.start_signal_emit()
        return

if __name__ == "__main__":
    app = QtGui.QApplication([])
    win = MainWindow()
    win.show()
    sys.exit(app.exec_())
like image 24
Caleb Hattingh Avatar answered Nov 08 '22 00:11

Caleb Hattingh


All you have done in the Worker_Thread should have been moved to the Worker. Qt assigns thread affinity to each of the objects based on where the object is created. Worker_Thread objects is created in the main thread, so it has main thread affinity. If a signal from main thread is connected to a slot of a object created in main thread, the slot will also be executed in the main thread. (No matter it's QueuedConnection or DirectConnection). And the slot blocks the GUI.

Do this:

class Worker:
    update_signal = QtCore.pyqtSignal(int)
    done_signal = QtCore.pyqtSignal()

    def __init__(self, some_var):
        self.some_var = some_var
        self.iteration = 0
        self.queue = mp.Queue()
        self.process = mp.Process(target=self.some_complex_processing, args=(self.queue,))

    def some_complex_processing(self, queue):
        for i in range(0,5000):
            self.iteration += 1
            queue.put(self.iteration)
        queue.put('done with processing')

    @QtCore.pyqtSlot()
    def start_computation(self):
        self.process.start()
        while(True):
            try:
                message = self.queue.get()
                self.update_signal.emit(message)
            except EOFError:
                pass
            if message == 'done with processing':
                self.done_signal.emit()
                break
        self.process.join()
        return



class Tab(QtGui.QTabWidget):
    start_comp = QtCore.pyqtSignal()
    def __init__(self, parent, this_worker):
        self.parent = parent
        self.this_worker = this_worker
        QtGui.QTabWidget.__init__(self, parent)

        self.treeWidget = QtGui.QTreeWidget(self)
        self.properties = QtGui.QTreeWidgetItem(self.treeWidget, ["Properties"])
        self.step = QtGui.QTreeWidgetItem(self.properties, ["Iteration #"])

        # Use QThread is enough
        self.thread = QtCore.QThread();
        # Change the thread affinity of worker to self.thread.
        self.this_worker.moveToThread(self.thread);
        self.this_worker.update_signal.connect(self.update_GUI)
        self.this_worker.done_signal.connect(self.thread.quit)
        self.start_comp.connect(self.this_worker.start_computation)
        self.thread.start()

    ###############################
    # Here is what should update the GUI at every iteration of Worker.some_complex_processing()
    # The message appears to be getting sent, due to seeing the print statement in the console, but the GUI is not updated.
    @QtCore.pyqtSlot(int)
    def update_GUI(self, iteration):
        self.step.setText(0, str(iteration))
        #time.sleep(0.1)
        print iteration

    def start_signal_emit(self):
        self.start_comp.emit()
like image 3
Min Lin Avatar answered Nov 08 '22 00:11

Min Lin