Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PyQt: QThreadPool with QRunnables taking time to quit

I've a class who create QRunnables and start them in a QThreadPool instance.

My threads are working well, but in case user want to quit application, the application takes a long time to stop. Certainly due to the fact that the requests launched take time.

Here is a snippet code of how I use QThreadPool, QRunnables:

import sys
from PyQt5.Qt import QThreadPool, QApplication, QWidget, QVBoxLayout
from PyQt5.Qt import QTimer, QObject, QPushButton, QLabel
from PyQt5.Qt import QRunnable


class BackendQRunnable(QRunnable):
    """
        Class who create a QThread to trigger requests
    """

    def __init__(self, task):
        super(BackendQRunnable, self).__init__()
        self.task = task

    def run(self):
        """
        Run the QRunnable. Trigger actions depending on the selected task

        """

        # Here I make long requests
        if 'user' in self.task:
            self.query_user_data()
        elif 'host' in self.task:
            self.query_hosts_data()
        elif 'service' in self.task:
            self.query_services_data()
        elif 'alignakdaemon' in self.task:
            self.query_daemons_data()
        elif 'livesynthesis' in self.task:
            self.query_livesynthesis_data()
        elif 'history' in self.task:
            self.query_history_data()
        elif 'notifications' in self.task:
            self.query_notifications_data()
        else:
            pass

    @staticmethod
    def query_user_data():
        """
        Launch request for "user" endpoint

        """

        print('Query user data')

    @staticmethod
    def query_hosts_data():
        """
        Launch request for "host" endpoint

        """

        print('Query hosts')

    @staticmethod
    def query_services_data():
        """
        Launch request for "service" endpoint

        """

        print("Query services")

    @staticmethod
    def query_daemons_data():
        """
        Launch request for "alignakdaemon" endpoint

        """

        print('Query daemons')

    @staticmethod
    def query_livesynthesis_data():
        """
        Launch request for "livesynthesis" endpoint

        """

        print('query livesynthesis')

    @staticmethod
    def query_history_data():
        """
        Launch request for "history" endpoint but only for hosts in "data_manager"

        """

        print('Query history')

    @staticmethod
    def query_notifications_data():
        """
        Launch request for "history" endpoint but only for notifications of current user

        """

        print('Query notifications')


class ThreadManager(QObject):
    """
        Class who create BackendQRunnable to periodically request on a Backend
    """

    def __init__(self, parent=None):
        super(ThreadManager, self).__init__(parent)
        self.backend_thread = BackendQRunnable(self)
        self.pool = QThreadPool.globalInstance()
        self.tasks = self.get_tasks()

    def start(self):
        """
        Start ThreadManager

        """

        print("Start backend Manager...")

        # Make a first request
        self.create_tasks()

        # Then request periodically
        timer = QTimer(self)
        timer.setInterval(10000)
        timer.start()
        timer.timeout.connect(self.create_tasks)

    @staticmethod
    def get_tasks():
        """
        Return the tasks to run in BackendQRunnable

        :return: tasks to run
        :rtype: list
        """

        return [
            'notifications', 'livesynthesis', 'alignakdaemon', 'history', 'service', 'host', 'user',
        ]

    def create_tasks(self):
        """
        Create tasks to run

        """

        for cur_task in self.tasks:
            backend_thread = BackendQRunnable(cur_task)

            # Add task to QThreadPool
            self.pool.start(backend_thread)

    def exit_pool(self):
        """
        Exit all BackendQRunnables and delete QThreadPool

        """

        # When trying to quit, the application takes a long time to stop
        self.pool.globalInstance().waitForDone()
        self.pool.deleteLater()

        sys.exit(0)


if __name__ == '__main__':
    app = QApplication(sys.argv)

    thread_manager = ThreadManager()
    thread_manager.start()

    layout = QVBoxLayout()

    label = QLabel("Start")
    button = QPushButton("DANGER!")
    button.pressed.connect(thread_manager.exit_pool)

    layout.addWidget(label)
    layout.addWidget(button)

    w = QWidget()

    w.setLayout(layout)
    w.show()

    sys.exit(app.exec_())

In function exit_pool, I wait until threads are finished and delete the QThreadPool instance...

Is there a way not to wait for each thread and stop everything directly ?

EDIT Solution:

So I have approached the subject differently. I replaced my QRunnable with a QThread. I removed QThreadPool and I manage threads myself in a list. I also added a pyqtSignal in order to stop the QTimer and close the running threads by quit() function.

Like that all my thread quit without problem.

import sys
from PyQt5.Qt import QThread, QApplication, QWidget, QVBoxLayout
from PyQt5.Qt import QTimer, QObject, QPushButton, QLabel, pyqtSignal


class BackendQThread(QThread):
    """
        Class who create a QThread to trigger requests
    """

    quit_thread = pyqtSignal(name='close_thread')

    def __init__(self, task):
        super(BackendQThread, self).__init__()
        self.task = task

    def run(self):
        """
        Run the actions depending on the selected task

        """

        # Here I make long requests
        if 'user' in self.task:
            self.query_user_data()
        elif 'host' in self.task:
            self.query_hosts_data()
        elif 'service' in self.task:
            self.query_services_data()
        elif 'alignakdaemon' in self.task:
            self.query_daemons_data()
        elif 'livesynthesis' in self.task:
            self.query_livesynthesis_data()
        elif 'history' in self.task:
            self.query_history_data()
        elif 'notifications' in self.task:
            self.query_notifications_data()
        else:
            pass

    @staticmethod
    def query_user_data():
        """
        Launch request for "user" endpoint

        """

        print('Query user data')

    @staticmethod
    def query_hosts_data():
        """
        Launch request for "host" endpoint

        """

        print('Query hosts')

    @staticmethod
    def query_services_data():
        """
        Launch request for "service" endpoint

        """

        print("Query services")

    @staticmethod
    def query_daemons_data():
        """
        Launch request for "alignakdaemon" endpoint

        """

        print('Query daemons')

    @staticmethod
    def query_livesynthesis_data():
        """
        Launch request for "livesynthesis" endpoint

        """

        print('query livesynthesis')

    @staticmethod
    def query_history_data():
        """
        Launch request for "history" endpoint but only for hosts in "data_manager"

        """

        print('Query history')

    @staticmethod
    def query_notifications_data():
        """
        Launch request for "history" endpoint but only for notifications of current user

        """

        print('Query notifications')


class ThreadManager(QObject):
    """
        Class who create BackendQThread to periodically request on a Backend
    """

    def __init__(self, parent=None):
        super(ThreadManager, self).__init__(parent)
        self.tasks = self.get_tasks()
        self.timer = QTimer()
        self.threads = []

    def start(self):
        """
        Start ThreadManager

        """

        print("Start backend Manager...")

        # Make a first request
        self.create_tasks()

        # Then request periodically
        self.timer.setInterval(10000)
        self.timer.start()
        self.timer.timeout.connect(self.create_tasks)

    @staticmethod
    def get_tasks():
        """
        Return the available tasks to run

        :return: tasks to run
        :rtype: list
        """

        return [
            'notifications', 'livesynthesis', 'alignakdaemon', 'history', 'service', 'host', 'user',
        ]

    def create_tasks(self):
        """
        Create tasks to run

        """

        # Here I reset the list of threads
        self.threads = []
        for cur_task in self.tasks:
            backend_thread = BackendQThread(cur_task)

            # Add task to QThreadPool
            backend_thread.start()
            self.threads.append(backend_thread)

    def stop(self):
        """
        Stop the manager and close all QThreads

        """

        print("Stop tasks")
        self.timer.stop()
        for task in self.threads:
            task.quit_thread.emit()

        print("Tasks finished")


if __name__ == '__main__':
    app = QApplication(sys.argv)

    layout = QVBoxLayout()
    widget = QWidget()
    widget.setLayout(layout)

    thread_manager = ThreadManager()

    start_btn = QPushButton("Start")
    start_btn.clicked.connect(thread_manager.start)
    layout.addWidget(start_btn)

    stop_btn = QPushButton("Stop")
    stop_btn.clicked.connect(thread_manager.stop)
    layout.addWidget(stop_btn)

    widget.show()

    sys.exit(app.exec_())
like image 437
Algorys Avatar asked Oct 01 '17 09:10

Algorys


1 Answers

You cannot stop a QRunnable once it's started. However, there are a few simple things you can do to reduce the wait time in your example.

Firstly, you can stop the timer, so that it doesn't add any more tasks. Secondly, you can clear the thread-pool so that it removes any pending tasks. Thirdly, you could try setting a smaller maximum thread count to see whether it still achieves acceptable performance. By default, the thread-pool will use QThread.idealThreadCount() to set the maximum number of threads - which usually means one for each processor core on the system.

A final option is to provide a way to interrupt the code that executes in your runnables. This will only really be possible if the code runs a loop which can periodically check a flag to see if it should continue. In your example, it looks like you could use a single shared class attribute for the flag, since all the tasks call static methods. But if the code is not interruptable in this way, there is nothing else you can do - you will just have to wait for the currently running tasks to finish.

like image 116
ekhumoro Avatar answered Oct 15 '22 09:10

ekhumoro