What are the best ways to create a multiprocessing/ GUI coding system?
I would like to create a place for the internet community to come and find examples on how to use the multiprocessing
module in python.
I have seen several small examples of multiprocessing
processes on the internet of simple global functions which are called in a main module, but I have found that this rarely translates easily into anything that anyone actually does with regard to GUIs. I would think that many programs would have the functions which they want to use in a separate process as methods of objects (which may be aggregates of other objects etc.) and perhaps a single GUI element would have an associated object that needs to call this process, etc.
For example, I have a relatively complex program and I am having problems in getting a responsive GUI for it, which I believed to be due to my lack of understanding in multiprocessing
and threading with QThread
. However, I do know that the example given below will at least pass information between processes in the manner I desire (due to being able to execute print
statements) but my GUI is still locking. Does anyone know what may be causing this, and if it is still a probelm with my lack of understanding in mutlithreaded/multiprocessing architectures?
Here is a small pseudo code example of what I am doing:
class Worker:
...
def processing(self, queue):
# put stuff into queue in a loop
# This thread gets data from Worker
class Worker_thread(QThread):
def __init__(self):
...
# make process with Worker inside
def start_processing(self):
# continuously get data from Worker
# send data to Tab object with signals/slots
class Tab(QTabWidget):
# spawn a thread separate from main GUI thread
# update GUI using slot
def update_GUI()
And this code is fully compilable example which embodies the overlying sturcture of my program:
from PyQt4 import QtCore, QtGui
import multiprocessing as mp
import numpy as np
import sys
import time
# This object can hold several properties which will be used for the processing
# and will be run in the background, while it updates a thread with all of it's progress
class Worker:
def __init__(self, some_var):
self.some_var = some_var
self.iteration = 0
def some_complex_processing(self, queue):
for i in range(0,5000):
self.iteration += 1
queue.put(self.iteration)
queue.put('done with processing')
# This Woker_thread is a thread which will spawn a separate process (Worker).
# This separate is needed in order to separate the data retrieval
# from the main GUI thread, which should only quickly update when needed
class Worker_thread(QtCore.QThread):
# signals and slots are used to communicate back to the main GUI thread
update_signal = QtCore.pyqtSignal(int)
done_signal = QtCore.pyqtSignal()
def __init__(self, parent, worker):
QtCore.QThread.__init__(self, parent)
self.queue = mp.Queue()
self.worker = worker
self.parent = parent
self.process = mp.Process(target=self.worker.some_complex_processing, args=(self.queue,))
# When the process button is pressed, this function will start getting data from Worker
# this data is then retrieved by the queue and pushed through a signal
# to Tab.update_GUI
@QtCore.pyqtSlot()
def start_computation(self):
self.process.start()
while(True):
try:
message = self.queue.get()
self.update_signal.emit(message)
except EOFError:
pass
if message == 'done with processing':
self.done_signal.emit()
break
#self.parent.update_GUI(message)
self.process.join()
return
# Each tab will start it's own thread, which will spawn a process
class Tab(QtGui.QTabWidget):
start_comp = QtCore.pyqtSignal()
def __init__(self, parent, this_worker):
self.parent = parent
self.this_worker = this_worker
QtGui.QTabWidget.__init__(self, parent)
self.treeWidget = QtGui.QTreeWidget(self)
self.properties = QtGui.QTreeWidgetItem(self.treeWidget, ["Properties"])
self.step = QtGui.QTreeWidgetItem(self.properties, ["Iteration #"])
self.thread = Worker_thread(parent=self, worker=self.this_worker)
self.thread.update_signal.connect(self.update_GUI)
self.thread.done_signal.connect(self.thread.quit)
self.start_comp.connect(self.thread.start_computation)
self.thread.start()
###############################
# Here is what should update the GUI at every iteration of Worker.some_complex_processing()
# The message appears to be getting sent, due to seeing the print statement in the console, but the GUI is not updated.
@QtCore.pyqtSlot(int)
def update_GUI(self, iteration):
self.step.setText(0, str(iteration))
#time.sleep(0.1)
print iteration
def start_signal_emit(self):
self.start_comp.emit()
# GUI stuff
class MainWindow(QtGui.QMainWindow):
def __init__(self, parent = None):
QtGui.QMainWindow.__init__(self)
self.tab_list = []
self.setTabShape(QtGui.QTabWidget.Rounded)
self.centralwidget = QtGui.QWidget(self)
self.top_level_layout = QtGui.QGridLayout(self.centralwidget)
self.tabWidget = QtGui.QTabWidget(self.centralwidget)
self.top_level_layout.addWidget(self.tabWidget, 1, 0, 25, 25)
process_button = QtGui.QPushButton("Process")
self.top_level_layout.addWidget(process_button, 0, 1)
QtCore.QObject.connect(process_button, QtCore.SIGNAL("clicked()"), self.process)
self.setCentralWidget(self.centralwidget)
self.centralwidget.setLayout(self.top_level_layout)
# Make Tabs in loop from button
for i in range(0,10):
name = 'tab' + str(i)
self.tab_list.append(Tab(self.tabWidget, Worker(name)))
self.tabWidget.addTab(self.tab_list[-1], name)
# Do the processing
def process(self):
for tab in self.tab_list:
tab.start_signal_emit()
return
if __name__ == "__main__":
app = QtGui.QApplication([])
win = MainWindow()
win.show()
sys.exit(app.exec_())
More Information: I am writing a program which I would like to spawn several processes from and have them continuously show their progress throughout their processing. I would like the program to be multiprocessed in order to get the best speed out of the program as possible.
At the moment, I am trying to use a thread to spawn a process and use signals and slots to update the GUI while the data is continuously retrieved by a queue. It appears that the queues
, signals
, and slots
work when using print
statements, but can not update the GUI. If anyone has any other suggestions as to how I should structure this in order to keep the program more managable, I would like to learn.
EDIT: I have made the adjustments put forth by Min Lin, with the addition of making Worker
a QObject
so that moveToThread()
would work.
Here is the new code I have at the moment:
from PyQt4 import QtCore, QtGui
import multiprocessing as mp
import numpy as np
import sys
import time
class Worker(QtCore.QObject):
update_signal = QtCore.pyqtSignal(int)
done_signal = QtCore.pyqtSignal()
def __init__(self, some_var):
QtCore.QObject.__init__(self, parent=None)
self.some_var = some_var
self.iteration = 0
self.queue = mp.Queue()
self.process = mp.Process(target=self.some_complex_processing, args=(self.queue,))
def some_complex_processing(self, queue):
for i in range(0,5000):
self.iteration += 1
queue.put(self.iteration)
queue.put('done with processing')
@QtCore.pyqtSlot()
def start_computation(self):
self.process.start()
while(True):
try:
message = self.queue.get()
self.update_signal.emit(message)
except EOFError:
pass
if message == 'done with processing':
self.done_signal.emit()
break
self.process.join()
return
class Tab(QtGui.QTabWidget):
start_comp = QtCore.pyqtSignal()
def __init__(self, parent, this_worker):
self.parent = parent
self.this_worker = this_worker
QtGui.QTabWidget.__init__(self, parent)
self.treeWidget = QtGui.QTreeWidget(self)
self.properties = QtGui.QTreeWidgetItem(self.treeWidget, ["Properties"])
self.step = QtGui.QTreeWidgetItem(self.properties, ["Iteration #"])
# Use QThread is enough
self.thread = QtCore.QThread();
# Change the thread affinity of worker to self.thread.
self.this_worker.moveToThread(self.thread);
self.this_worker.update_signal.connect(self.update_GUI)
self.this_worker.done_signal.connect(self.thread.quit)
self.start_comp.connect(self.this_worker.start_computation)
self.thread.start()
###############################
# Here is what should update the GUI at every iteration of Worker.some_complex_processing()
# The message appears to be getting sent, due to seeing the print statement in the console, but the GUI is not updated.
@QtCore.pyqtSlot(int)
def update_GUI(self, iteration):
self.step.setText(0, str(iteration))
#time.sleep(0.1)
print iteration
def start_signal_emit(self):
self.start_comp.emit()
# GUI stuff
class MainWindow(QtGui.QMainWindow):
def __init__(self, parent = None):
QtGui.QMainWindow.__init__(self)
self.tab_list = []
self.setTabShape(QtGui.QTabWidget.Rounded)
self.centralwidget = QtGui.QWidget(self)
self.top_level_layout = QtGui.QGridLayout(self.centralwidget)
self.tabWidget = QtGui.QTabWidget(self.centralwidget)
self.top_level_layout.addWidget(self.tabWidget, 1, 0, 25, 25)
process_button = QtGui.QPushButton("Process")
self.top_level_layout.addWidget(process_button, 0, 1)
QtCore.QObject.connect(process_button, QtCore.SIGNAL("clicked()"), self.process)
self.setCentralWidget(self.centralwidget)
self.centralwidget.setLayout(self.top_level_layout)
# Make Tabs in loop from button
for i in range(0,10):
name = 'tab' + str(i)
self.tab_list.append(Tab(self.tabWidget, Worker(name)))
self.tabWidget.addTab(self.tab_list[-1], name)
# Do the processing
def process(self):
for tab in self.tab_list:
tab.start_signal_emit()
return
if __name__ == "__main__":
app = QtGui.QApplication([])
win = MainWindow()
win.show()
sys.exit(app.exec_())
Thank you for all of the answers, I appreciate the level of detail that everyone has gone into in describing the idea they believe to be solution, but unfortunately I have not yet been able to perform these types of processes which operate on the object they belong to while displaying the object's attribute on a GUI.
However, I have learned a decent amount from this post, which allowed me to realize that the threaded version I have at the moment is hanging the GUI since the GUI update function is too large and takes too much processing.
So, I have taken the QTimer()
approach to my multi-threaded version and it is performing much better! I would advise anyone facing similar problems to at least attempt something similar to this.
I was unaware of this approach to solving GUI update problems, and it is now a pseudo or temporary fix to the problem I am facing.
A GUI application is perfect for testing stuff, as it is easy to spawn new tasks and visualize what is going on, so I wrote a little example app (Screenshot, code is below) as I did want to learn it for my self.
At first, i took a similar approach as yours, trying to implement the Consumer/Producer pattern and I struggeled with Background Processes doing endless loops to wait for new jobs and took care of communication back and forth for myself. Then I found out about the Pool Interface and then I could replace all that hidious code with just a few lines. All you need is a single pool and a few callbacks:
#!/usr/bin/env python3
import multiprocessing, time, random, sys
from PySide.QtCore import * # equivalent: from PyQt4.QtCore import *
from PySide.QtGui import * # equivalent: from PyQt4.QtGui import *
def compute(num):
print("worker() started at %d" % num)
random_number = random.randint(1, 6)
if random_number in (2, 4, 6):
raise Exception('Random Exception in _%d' % num)
time.sleep(random_number)
return num
class MainWindow(QMainWindow):
def __init__(self):
QMainWindow.__init__(self)
self.toolBar = self.addToolBar("Toolbar")
self.toolBar.addAction(QAction('Add Task', self, triggered=self.addTask))
self.list = QListWidget()
self.setCentralWidget(self.list)
# Pool of Background Processes
self.pool = multiprocessing.Pool(processes=4)
def addTask(self):
num_row = self.list.count()
self.pool.apply_async(func=compute, args=(num_row,), callback=self.receiveResult,
error_callback=self.receiveException)
item = QListWidgetItem("item %d" % num_row)
item.setForeground(Qt.gray)
self.list.addItem(item)
def receiveResult(self, result):
assert isinstance(result, int)
print("end_work(), where result is %s" % result)
self.list.item(result).setForeground(Qt.darkGreen)
def receiveException(self, exception):
error = str(exception)
_pos = error.find('_') + 1
num_row = int(error[_pos:])
item = self.list.item(num_row)
item.setForeground(Qt.darkRed)
item.setText(item.text() + ' Retry...')
self.pool.apply_async(func=compute, args=(num_row,), callback=self.receiveResult,
error_callback=self.receiveException)
if __name__ == '__main__':
app = QApplication(sys.argv)
main_window = MainWindow()
main_window.show()
sys.exit(app.exec_())
Edit: I did another example using a QTimer instead of Callbacks, checking periodically for Entries in a Queue, updating a QProgressBar:
#!/usr/bin/env python3
import multiprocessing, multiprocessing.pool, time, random, sys
from PySide.QtCore import *
from PySide.QtGui import *
def compute(num_row):
print("worker started at %d" % num_row)
random_number = random.randint(1, 10)
for second in range(random_number):
progress = float(second) / float(random_number) * 100
compute.queue.put((num_row, progress,))
time.sleep(1)
compute.queue.put((num_row, 100))
def pool_init(queue):
# see http://stackoverflow.com/a/3843313/852994
compute.queue = queue
class MainWindow(QMainWindow):
def __init__(self):
QMainWindow.__init__(self)
self.toolBar = self.addToolBar("Toolbar")
self.toolBar.addAction(QAction('Add Task', self, triggered=self.addTask))
self.table = QTableWidget()
self.table.verticalHeader().hide()
self.table.setColumnCount(2)
self.setCentralWidget(self.table)
# Pool of Background Processes
self.queue = multiprocessing.Queue()
self.pool = multiprocessing.Pool(processes=4, initializer=pool_init, initargs=(self.queue,))
# Check for progress periodically
self.timer = QTimer()
self.timer.timeout.connect(self.updateProgress)
self.timer.start(2000)
def addTask(self):
num_row = self.table.rowCount()
self.pool.apply_async(func=compute, args=(num_row,))
label = QLabel("Queued")
bar = QProgressBar()
bar.setValue(0)
self.table.setRowCount(num_row + 1)
self.table.setCellWidget(num_row, 0, label)
self.table.setCellWidget(num_row, 1, bar)
def updateProgress(self):
if self.queue.empty(): return
num_row, progress = self.queue.get() # unpack
print("received progress of %s at %s" % (progress, num_row))
label = self.table.cellWidget(num_row, 0)
bar = self.table.cellWidget(num_row, 1)
bar.setValue(progress)
if progress == 100:
label.setText('Finished')
elif label.text() == 'Queued':
label.setText('Downloading')
self.updateProgress() # recursion
if __name__ == '__main__':
app = QApplication(sys.argv)
main_window = MainWindow()
main_window.show()
sys.exit(app.exec_())
Thank you so much for posting this question, and all contributors for their input. It provided a useful scaffolding for me to experiment with PyQt and multiprocessing.
I started with the second code sample listed in the question. My changes and comments:
Process.__init__()
must be pickleable. You will see that @valmynd makes his/her compute
function a top-level function, for this very reason. This is partly because multiprocessing will reimport the module containing the target function. To remind myself of this, I try to put the target function into its own module (and make sure any information is passed as pickleable arguments). I have moved the complex processing function into its own module called workermodule.py
.time.sleep
, but it is more satisfying to light up all the cores for a bit.self.process
could be created with two queues as arguments, one for input and one for output. Then the complex processing function would have to periodically check the input queue for data.workermodule.py:
import time
def some_complex_processing(queue):
iteration = 0
for i in range(0,5000):
iteration += 1
queue.put(iteration)
#time.sleep(20e-3) # ms
# You could time.sleep here to simulate a
# long-running process, but just to prove
# that we're not cheating, let's make this
# process work hard, while the GUI process
# should still have smooth drawing.
for x in range(100000):
y = x
queue.put('done with processing')
mainfile.py:
from PyQt4 import QtCore, QtGui
import multiprocessing as mp
import sys
import workermodule
class Worker(QtCore.QObject):
update_signal = QtCore.pyqtSignal(int)
done_signal = QtCore.pyqtSignal()
def __init__(self, some_var):
QtCore.QObject.__init__(self, parent=None)
self.some_var = some_var
self.queue = mp.Queue()
self.process = mp.Process(
target=workermodule.some_complex_processing,
args=(self.queue,)
)
@QtCore.pyqtSlot()
def start_computation(self):
self.process.start()
while True:
try:
message = self.queue.get()
self.update_signal.emit(message)
except EOFError:
pass
if message == 'done with processing':
self.done_signal.emit()
break
self.process.join()
return
class Tab(QtGui.QTabWidget):
start_comp = QtCore.pyqtSignal()
def __init__(self, parent, this_worker):
self.parent = parent
self.this_worker = this_worker
QtGui.QTabWidget.__init__(self, parent)
self.treeWidget = QtGui.QTreeWidget(self)
self.properties = QtGui.QTreeWidgetItem(self.treeWidget, ["Properties"])
self.step = QtGui.QTreeWidgetItem(self.properties, ["Iteration #"])
# Use QThread is enough
self.thread = QtCore.QThread();
# Change the thread affinity of worker to self.thread.
self.this_worker.moveToThread(self.thread);
self.this_worker.update_signal.connect(self.update_GUI)
self.this_worker.done_signal.connect(self.thread.quit)
self.start_comp.connect(self.this_worker.start_computation)
self.thread.start()
###############################
# Here is what should update the GUI at every iteration of Worker.some_complex_processing()
# The message appears to be getting sent, due to seeing the print statement in the console, but the GUI is not updated.
@QtCore.pyqtSlot(int)
def update_GUI(self, iteration):
self.step.setText(0, str(iteration))
print iteration
def start_signal_emit(self):
self.start_comp.emit()
# GUI stuff
class MainWindow(QtGui.QMainWindow):
def __init__(self, parent = None):
QtGui.QMainWindow.__init__(self)
self.tab_list = []
self.setTabShape(QtGui.QTabWidget.Rounded)
self.centralwidget = QtGui.QWidget(self)
self.top_level_layout = QtGui.QGridLayout(self.centralwidget)
self.tabWidget = QtGui.QTabWidget(self.centralwidget)
self.top_level_layout.addWidget(self.tabWidget, 1, 0, 25, 25)
process_button = QtGui.QPushButton("Process")
self.top_level_layout.addWidget(process_button, 0, 1)
QtCore.QObject.connect(process_button, QtCore.SIGNAL("clicked()"), self.process)
self.setCentralWidget(self.centralwidget)
self.centralwidget.setLayout(self.top_level_layout)
# Make Tabs in loop from button
for i in range(0,10):
name = 'tab' + str(i)
self.tab_list.append(Tab(self.tabWidget, Worker(name)))
self.tabWidget.addTab(self.tab_list[-1], name)
# Do the processing
def process(self):
for tab in self.tab_list:
tab.start_signal_emit()
return
if __name__ == "__main__":
app = QtGui.QApplication([])
win = MainWindow()
win.show()
sys.exit(app.exec_())
All you have done in the Worker_Thread should have been moved to the Worker. Qt assigns thread affinity to each of the objects based on where the object is created. Worker_Thread objects is created in the main thread, so it has main thread affinity. If a signal from main thread is connected to a slot of a object created in main thread, the slot will also be executed in the main thread. (No matter it's QueuedConnection or DirectConnection). And the slot blocks the GUI.
Do this:
class Worker:
update_signal = QtCore.pyqtSignal(int)
done_signal = QtCore.pyqtSignal()
def __init__(self, some_var):
self.some_var = some_var
self.iteration = 0
self.queue = mp.Queue()
self.process = mp.Process(target=self.some_complex_processing, args=(self.queue,))
def some_complex_processing(self, queue):
for i in range(0,5000):
self.iteration += 1
queue.put(self.iteration)
queue.put('done with processing')
@QtCore.pyqtSlot()
def start_computation(self):
self.process.start()
while(True):
try:
message = self.queue.get()
self.update_signal.emit(message)
except EOFError:
pass
if message == 'done with processing':
self.done_signal.emit()
break
self.process.join()
return
class Tab(QtGui.QTabWidget):
start_comp = QtCore.pyqtSignal()
def __init__(self, parent, this_worker):
self.parent = parent
self.this_worker = this_worker
QtGui.QTabWidget.__init__(self, parent)
self.treeWidget = QtGui.QTreeWidget(self)
self.properties = QtGui.QTreeWidgetItem(self.treeWidget, ["Properties"])
self.step = QtGui.QTreeWidgetItem(self.properties, ["Iteration #"])
# Use QThread is enough
self.thread = QtCore.QThread();
# Change the thread affinity of worker to self.thread.
self.this_worker.moveToThread(self.thread);
self.this_worker.update_signal.connect(self.update_GUI)
self.this_worker.done_signal.connect(self.thread.quit)
self.start_comp.connect(self.this_worker.start_computation)
self.thread.start()
###############################
# Here is what should update the GUI at every iteration of Worker.some_complex_processing()
# The message appears to be getting sent, due to seeing the print statement in the console, but the GUI is not updated.
@QtCore.pyqtSlot(int)
def update_GUI(self, iteration):
self.step.setText(0, str(iteration))
#time.sleep(0.1)
print iteration
def start_signal_emit(self):
self.start_comp.emit()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With