Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to return value from function running by QThread and Queue

Please explain how do we send/receive data from Thread managed by Queue....

First I subclass 'QThread' defining its run() method which is started when QThread's.start() is called:

class SimpleThread(QtCore.QThread):
    def __init__(self, queue, parent=None):
        QtCore.QThread.__init__(self, parent)      
        self.queue=queue        
    def run(self):
        while True:
            arg=self.queue.get() 
            self.fun(arg)    
            self.queue.task_done()
    def fun(self, arg):
        for i in range (3):
            print 'fun: %s'%i
            self.sleep(1)
        return arg+1

Then I declare two Thread instances (so only two CPU cores are taken) sending self.queue instance as an argument.

self.queue=queue.Queue()
for i in range(2):
    thread=SimpleThread(self.queue)
    thread.start()

Now if I understand it correctly thread.start() is not starting anything. The real "start" happens only when I call queue.put():

for arg in [1,2,3]: self.queue.put(arg)

This last line is what makes a "real" call. Aside from creating and starting Queue item put() allows to save any arbitrary value to each Queue item. .put() does several things at once: it creates, it starts, it moves the processing through the Queue and it allows to place a variable "inside" of the queue item (which later can be retrieved from inside of the function-processor: using Queue item's '.get()` method).

But how do I return the value from fun() function. A "regular" fun()'s return resultValue doesn't work. And I can't use self.queue.put() method since this method aside from storing a data "creates" a new queue item...

EDITED LATER:

Here is slightly tweaked code (copy/pasted from another post) showing an approach on how to return a value from completed Thread. I am not sure if the the approach used here would work with QThread... please correct me if I am wrong:

import os, sys
import threading
import Queue

def callMe(incomingFun, daemon=False):
    def execute(_queue, *args, **kwargs):
        result=incomingFun(*args, **kwargs)
        _queue.put(result)

    def wrap(*args, **kwargs):
        _queue=Queue.Queue()
        _thread=threading.Thread(target=execute, args=(_queue,)+args, kwargs=kwargs)
        _thread.daemon=daemon
        _thread.start()
        _thread.result_queue=_queue        
        return _thread

    return wrap

@callMe
def localFunc(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

thread=localFunc(10)

# this blocks, waiting for the result
result = thread.result_queue.get()
print result
like image 616
alphanumeric Avatar asked Aug 03 '14 19:08

alphanumeric


1 Answers

In normal circumstances you'd use a result queue to send results back, and then have some other thread running that waits for the results:

class SimpleThread(QtCore.QThread):
    def __init__(self, queue, result_queue, parent=None):
        QtCore.QThread.__init__(self, parent)      
        self.queue=queue
        self.result_queue = result_queue

    def run(self):
        while True:
            arg=self.queue.get() 
            self.fun(arg)    
            self.queue.task_done()

    def fun(self, arg):
        for i in range (3):
            print 'fun: %s'%i
            self.sleep(1)
        self.result_queue.put(arg+1)

def handle_results(result_queue):
   while True:
       result = result_queue.get()
       print("Got result {}".format(result))

Main thread:

self.queue=queue.Queue()
self.result_queue = queue.Queue()

result_handler = threading.Thread(target=handle_results, self.result_queue)
for i in range(2):
    thread=SimpleThread(self.queue, self.result_queue)
    thread.start()

Doing it this way will keep you from blocking the GUI's event loop while you wait for the results. Here's what the equivalent would look like with multiprocessing.pool.ThreadPool:

from multiprocessing.pool import ThreadPool
import time


def fun(arg):
    for i in range (3):
        print 'fun: %s'%i
        time.sleep(1)
    return arg+1

def handle_result(result):
   print("got result {}".format(result))

pool = ThreadPool(2)
pool.map_async(fun, [1,2,3], callback=handle_result)

Which is a lot simpler. It internally creates a result handling thread, which will automatically call handle_result for you when fun completes.

That said, you're using QThread, and you want the results to update GUI widgets, so you really want your results to be sent back to the main thread, not to a result handling thread. In that case, it makes sense to use Qt's signaling system, so that you can safely update the GUI when you receive the result:

from PyQt4 import QtCore, QtGui
import sys
import Queue as queue

class ResultObj(QtCore.QObject):
    def __init__(self, val):
        self.val = val

class SimpleThread(QtCore.QThread):
    finished = QtCore.pyqtSignal(object)

    def __init__(self, queue, callback, parent=None):
        QtCore.QThread.__init__(self, parent)      
        self.queue = queue
        self.finished.connect(callback)

    def run(self):
        while True:
            arg = self.queue.get() 
            if arg is None: # None means exit
                print("Shutting down")
                return
            self.fun(arg)    

    def fun(self, arg):
        for i in range(3):
            print 'fun: %s' % i
            self.sleep(1)
        self.finished.emit(ResultObj(arg+1))


class AppWindow(QtGui.QMainWindow):
    def __init__(self):
        super(AppWindow, self).__init__()
        mainWidget = QtGui.QWidget()
        self.setCentralWidget(mainWidget)
        mainLayout = QtGui.QVBoxLayout()
        mainWidget.setLayout(mainLayout)  
        button = QtGui.QPushButton('Process')
        button.clicked.connect(self.process)
        mainLayout.addWidget(button)

    def handle_result(self, result):
        val = result.val
        print("got val {}".format(val))
        # You can update the UI from here.

    def process(self):
        MAX_CORES=2
        self.queue = queue.Queue()
        self.threads = []
        for i in range(MAX_CORES):
            thread = SimpleThread(self.queue, self.handle_result)
            self.threads.append(thread)
            thread.start()  

        for arg in [1,2,3]:
            self.queue.put(arg)

        for _ in range(MAX_CORES): # Tell the workers to shut down
            self.queue.put(None)

app = QtGui.QApplication([])
window = AppWindow()
window.show()
sys.exit(app.exec_())

Output when the button is pushed:

fun: 0
 fun: 0
fun: 1
 fun: 1
fun: 2
 fun: 2
fun: 0
got val 2
got val 3
Shutting down
fun: 1
fun: 2
Shutting down
got val 4
like image 176
dano Avatar answered Sep 22 '22 22:09

dano