Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement multicore processing in Python 3 and PyQt5?

Background: I am trying to implement multicore processing in a python 3.4 PyQT5 application.

In the application I have numpy.ndarrays of frames, imagine it like a [n,m,t] array. I need to process each [n,m,:] array and using multicore would linearly speed up my process.

I've played with multiprocessing and got a simple script together using part of example scripts and gave me the following idea:

Simple No GUI code:

import multiprocessing as mp
import numpy

aa = numpy.random.rand(4,2,3)

def random_function(x):
    return x,x**3

if __name__ == '__main__':
    pool = mp.Pool(processes=4)

    #with apply_asynch

    #results = [pool.apply_async(cube, args=(aa[:,:,x],)) for x in range(0,aa.shape[2])]
    #output = [p.get() for p in results]
    #test_va = numpy.asarray( output)


    #with apply

    results = [pool.apply(random_function, args=(aa[:,:,x],)) for x in range(0,aa.shape[2])]
    test_va = numpy.asarray( results)

This works and does about what I need it to do.

Issue: Now when I implement this in PyQT5 I incur in the "pickling" problem . So following suggestions for PyQT4 here I've made a simple GUI, spawn a thread and use multiprocessing. As a result I've got the same GUI replicated 4 times and it just does not seem to work.

PyQT5 GUI non working code:

import sys, time
from PyQt5.QtCore import * 
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *

import multiprocessing as mp
import numpy

class WorkThread(QThread):
    finished = pyqtSignal(int,object)


    def __del__(self):
      self.wait()

    def cube(x):
        return x,x**3

    def run(self):
        aa = numpy.random.rand(4,2,3)

        pool = mp.Pool(processes=4)
        results = [pool.apply_async(self.cube, args=(aa[:,:,x],)) for x in range(0,aa.shape[2])]
        output = [p.get() for p in results]
        test_va = numpy.asarray( output)

        for i in range(5):

            QThread.sleep(0.3) # artificial time delay

            self.finished.emit(i,test_va)




class test_multicore(QWidget):
    def __init__(self, parent=None):
        QWidget.__init__(self, parent)

        self.setGeometry(300, 300, 280, 600)
        self.setWindowTitle('Qthreads and multicore')

        self.layout = QVBoxLayout(self)

        self.testButton = QPushButton("test")
        self.testButton.clicked.connect(self.test)

        self.listwidget = QListWidget(self)

        self.layout.addWidget(self.testButton)
        self.layout.addWidget(self.listwidget)
        self.threadPool = []

    def add(self, text,random_matrix): 
        """ Add item to list widget """
        print ("Add: " + str(text) +str(random_matrix))
        self.listwidget.addItem(str(text))
        self.listwidget.sortItems()

    def addBatch(self,text="text",iters=6,delay=0.3): 
        """ Add several items to list widget """
        for i in range(iters):
            time.sleep(delay) # artificial time delay
            self.add(text+" "+str(i), 0)


    def test(self):
        self.listwidget.clear()

        self.addBatch("_non_thread_entries",iters=6,delay=0.3)




        self.workThread = WorkThread()
        self.workThread.finished[int,object].connect(self.add)


        self.workThread.start()






# run
app = QApplication(sys.argv)
test = test_multicore()
test.show()
app.exec_()

I've also tried using Qobject and passing it to a thread with moveToThread but got the same issue again.

Question: How do I implement multicore processing in my Python 3.4 PyQT5 application? Consider that I will deploy using cx_freeze on Windows and Mac.

like image 461
HeoN Avatar asked Oct 31 '22 07:10

HeoN


2 Answers

Now I was trying to figure out how to launch multiple windows within their own umbrella (which is kind of what this question is asking) and the first thing I found out was that one cannot use a QWidget (or any Widget for that matter) within a sub-thread (a question that erroneously got dubbed a duplicate) as they must only reside within the main thread (at least I got a correct answer). Okay so then the question became can I create more than one main thread well I find out that I can do that using Multi-processing but that was not that simple since all the examples I could find did not show how this could be done. So in case anyone else might be struggling with this I thought I would post my basic implementation of this in order to give you a potential path should you wish to implement something like this yourself.

Further it gives a potential answer to this question. Now specifically this example shows not only how to launch two windows in different processes but these two windows also contain a secondary thread -- so a sort of multi-answer for multi-processing.

import sys
import time
import multiprocessing as mlti

from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *

class WorkerGUI(QRunnable):
    def __init__(self, InFunc):
        super(WorkerGUI, self).__init__()
        self.Func = InFunc

    @pyqtSlot()
    def run(self):
        self.Func()

class ChildWindow(QWidget):
    def __init__(self, name):
        QWidget.__init__(self)
        self.Name = name
        print('Name :',name)
        self.setWindowTitle(self.Name)
        if name == 'One':
            self.setGeometry(100,100,250,100)
        else:
            self.setGeometry(100,500,250,100)
        self.CountThis = False
        self.StartThis = True
        self.RunThis   = True

        self.btnCnt = QPushButton('Cnt')
        self.btnCnt.clicked.connect(self.CountMe)

        self.lblCntr = QLabel()
        self.lblCntr.setText('0')

        HBox = QHBoxLayout()
        HBox.addWidget(self.btnCnt)
        HBox.addWidget(QLabel('     '))
        HBox.addWidget(self.lblCntr)
        HBox.addStretch(1)

        VBox = QVBoxLayout()
        VBox.addLayout(HBox)
        VBox.addStretch(1)

        self.setLayout(VBox)
        self.WrkrGUI = WorkerGUI(self.CountOnMe)

    def CountMe(self):
        if self.CountThis:
            self.btnCnt.setText('Cnt')
            self.CountThis = False
        else:
            self.btnCnt.setText('Off')
            self.CountThis = True
            if self.StartThis:
                self.StartThis = False
                self.threadpool = QThreadPool()
                self.threadpool.start(self.WrkrGUI)

    def CountOnMe(self):
        cnt = 0
        while self.RunThis:
            while self.CountThis:
                cnt += 1
                self.lblCntr.setText(str(cnt))
                time.sleep(.01)
            time.sleep(.01)

def Process2():
    MainThred = QApplication([])

    ChildGUI = ChildWindow('Two')
    ChildGUI.show()

    sys.exit(MainThred.exec_())

def Process1():
    MainThred = QApplication([])

    ChildGUI = ChildWindow('One')
    ChildGUI.show()

    sys.exit(MainThred.exec_())

if __name__ == "__main__":
    multiprcess1 = mlti.Process(target=Process1)
    multiprcess2 = mlti.Process(target=Process2)

    multiprcess1.start()
    multiprcess2.start()
like image 31
Dennis Jensen Avatar answered Nov 30 '22 06:11

Dennis Jensen


Adding

if __name__ == '__main__':

before environment creation ensures the app is created once.

This is a working code/example for Multiprocessing pyqt5 python 3.4.

import sys, time
from PyQt5.QtCore import * 
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *

import multiprocessing as mp
import numpy

class WorkThread(QThread):
    finished = pyqtSignal(int,object)


    def __del__(self):
      self.wait()

    def cube(self,x):
        return x,x**3

    def run(self):
        aa = numpy.random.rand(4,2,3)

        pool = mp.Pool(processes=4)
        results = [pool.apply_async(self.cube, args=(aa[:,:,x],)) for x in range(0,aa.shape[2])]
        output = [p.get() for p in results]
        test_va = numpy.asarray( output)

        for i in range(5):

            QThread.sleep(0.3) # artificial time delay

            self.finished.emit(i,test_va)




class test_multicore(QWidget):
    def __init__(self, parent=None):
        QWidget.__init__(self, parent)

        self.setGeometry(300, 300, 280, 600)
        self.setWindowTitle('Qthreads and multicore')

        self.layout = QVBoxLayout(self)

        self.testButton = QPushButton("test")
        self.testButton.clicked.connect(self.test)

        self.listwidget = QListWidget(self)

        self.layout.addWidget(self.testButton)
        self.layout.addWidget(self.listwidget)
        self.threadPool = []

    def add(self, text,random_matrix): 
        """ Add item to list widget """
        print ("Add: " + str(text) +str(random_matrix))
        self.listwidget.addItem(str(text))
        self.listwidget.sortItems()

    def addBatch(self,text="text",iters=6,delay=0.3): 
        """ Add several items to list widget """
        for i in range(iters):
            time.sleep(delay) # artificial time delay
            self.add(text+" "+str(i), 0)


    def test(self):
        self.listwidget.clear()

        self.addBatch("_non_thread_entries",iters=6,delay=0.3)




        self.workThread = WorkThread()
        self.workThread.finished[int,object].connect(self.add)


        self.workThread.start()






# run
if __name__ == '__main__':
    app = QApplication(sys.argv)
    test = test_multicore()
    test.show()
    app.exec_()

To use apply_asynch substitute:

results = [pool.apply_async(cube, args=(aa[:,:,x],)) for x in range(0,aa.shape[2])]
output = [p.get() for p in results]
test_va = numpy.asarray( output)
like image 87
HeoN Avatar answered Nov 30 '22 06:11

HeoN