Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asynchronous call within a GUI application using multiprocessing

I have a GUI application that needs to fetch and parse diverse resources from network beside the GUI main loop. I've searched for options using the python multiprocessing module since these fetch actions not only contain blocking IO but also contain heavy parsing, so multiprocessing may be better option here than python threads. It would be easy using Twisted but this time Twisted is not a option.

I found a simple solution here:

Python subprocess: callback when cmd exits

The problem is that the callback magically isn't called within in the MainThread.

So I come up with the following solution:

delegate.py

import os
import multiprocessing as mp
import signal
from collections import namedtuple
import uuid
import logging


_CALLBACKS = {}
_QUEUE = mp.Queue()

info = logging.getLogger(__name__).info


class Call(namedtuple('Call', 'id finished result error')):

    def attach(self, func):
        if not self.finished:
            _CALLBACKS.setdefault(self.id, []).append(func)
        else:
            func(self.result or self.error)

        return self

    def callback(self):
        assert self.finished, 'Call not finished yet'
        r = self.result or self.error
        for func in _CALLBACKS.pop(self.id, []):
            func(r)

    def done(self, result=None, error=None):
        assert not self.finished, 'Call already finished'
        return self._replace(finished=(-1 if error else 1),
            result=result, error=error)

    @classmethod
    def create(clss):
        call = clss(uuid.uuid4().hex, 0, None, None) # uuid ???
        return call

def run(q, cb, func, args=None, kwargs=None):
    info('run: try running %s' % func)
    try:
        cb = cb.done(result=func(*(args or ()), **(kwargs or {})))
    except Exception, err:
        cb = cb.done(error=err)
    q.put(cb)
    os.kill(os.getppid(), signal.SIGUSR2) # SIGUSR2 ???
    info('run: leaving')

def on_callback(sig, frame):
    info('on_callback: checking queue ...')
    c = _QUEUE.get(True, 2)
    info('on_callback: got call - %s' % repr(c))
    c.callback()

signal.signal(signal.SIGUSR2, on_callback) # SIGUSR2 ???

def delegate(func, *args, **kwargs):
    info('delegate: %s %s' % (func, args,))
    cb = Call.create()
    mp.Process(target=run, args=(_QUEUE, cb, func, args, kwargs,)).start()
    return cb


__all__ = ['delegate']

usage

from delegate import delegate

def sleeper(secs):
    assert secs >= 1, 'I need my Augenpflege'
    info('sleeper: will go to sleep for %s secs' % secs)
    sleep(secs)
    info('sleeper: woke up - returning result')
    return ['sleeper', 'result']

def on_sleeper_result(r):
    if isinstance(r, Exception):
        info('on_sleeper_result: got error: %s' % r)
    else:
        info('on_sleeper_result: got result: %s' % r)

from delegate import delegate
delegate(sleeper, 3).attach(on_sleeper_result)
delegate(sleeper, -3).attach(on_sleeper_result)
while 1:
    info('main: loop')
    sleep(1)

output

0122 08432 MainThread INFO  delegate: <function sleeper at 0x163e320> (3,)
0123 08432 MainThread INFO  delegate: <function sleeper at 0x163e320> (-3,)
0124 08437 MainThread INFO  run: try running <function sleeper at 0x163e320>
0124 08437 MainThread INFO  sleeper: will go to sleep for 3 secs
0124 08432 MainThread INFO  main: loop
0125 08438 MainThread INFO  run: try running <function sleeper at 0x163e320>
0126 08438 MainThread INFO  run: leaving
0126 08432 MainThread INFO  on_callback: checking queue ...
0126 08432 MainThread INFO  on_callback: got call - Call(id='057649cba7d840e3825aa5ac73248f78', finished=-1, result=None, error=AssertionError('I need my Augenpflege',))
0127 08432 MainThread INFO  on_sleeper_result: got error: I need my Augenpflege
0127 08432 MainThread INFO  main: loop
1128 08432 MainThread INFO  main: loop
2129 08432 MainThread INFO  main: loop
3127 08437 MainThread INFO  sleeper: woke up - returning result
3128 08437 MainThread INFO  run: leaving
3128 08432 MainThread INFO  on_callback: checking queue ...
3129 08432 MainThread INFO  on_callback: got call - Call(id='041420c6c83a489aa5c7409c662d4917', finished=1, result=['sleeper', 'result'], error=None)
3129 08432 MainThread INFO  on_sleeper_result: got result: ['sleeper', 'result']
3129 08432 MainThread INFO  main: loop
4130 08432 MainThread INFO  main: loop
5132 08432 MainThread INFO  main: loop
...

So far this works pretty well, but my experience with the multiprocessing module are moderate and I'm a bit unsure if this will run without effects. My question is - what are the things I should especially care of while using multiprocessing in such a way ... or are there 'more correct' patterns for a asynchronous callback mechanism using the python standard lib?

like image 454
hooblei Avatar asked Oct 25 '22 01:10

hooblei


2 Answers

You have no reason to use signals (low level api) for python multiprocessing and busy waiting in the main loop.

You have to run your (modified) event loop in a QThread, that can directly call qt code, or use QApplication.postEvent (or pyqtSignal) to execute it in the main thread

# this should be in the delegate module
while 1:
    c = _QUEUE.get(True) # no timeout
    c.callback() # or post event to main thread

You can also see this page for discussion on communicating between threads in qt

like image 61
Mihai Stan Avatar answered Oct 27 '22 11:10

Mihai Stan


Your code works, but it isn't as simple as it could be. Let's step through the code.

This creates a Call instance in the main process:

def delegate(func, *args, **kwargs):
    cb = Call.create()

but when you pass cb to the worker process,

mp.Process(target=run, args=(_QUEUE, cb, func, args, kwargs,)).start()

the Call instance is copied during the os.forking, thus creating a second, separate instance. It then calls cb.done and which calls cb._replace which returns a third Call instance:

def done(self, result=None, error=None):
    assert not self.finished, 'Call already finished'
    return self._replace(finished=(-1 if error else 1),
        result=result, error=error)

The above calls the private namedtuple method _replace. It could have been a simple Python statements like

self.finished = -1 if error else 1

if Call were a subclass of object instead of a subclass of namedtuple. Subclassing namedtuple saved a bit of typing in __init__ but it becomes quite clumsy later on since we need to modify the namedtuple's attributes...

Meanwhile, the original Call instance returned by delegate(...) in the main process calls attach:

delegate(...).attach(on_sleeper_result)

This modifies the global _CALLBACKS dict. The worker processes have no way of knowing about this change in _CALLBACKS; in the worker processes _CALLBACKS is still the empty dict. So you have to pass the worker Call instance back to the main process through an mp.Queue, which uses the cb.id to reference the right functions in _CALLBACKS.

So it all works, but it creates three Call instances for every call to delegate, and the code might mislead the uninitiated into thinking the three Call instances are all the same object.... It all works out, but it's kind of complicated.

Have you considered using mp.Pool.apply_async's callback parameter instead?

import multiprocessing as mp
import logging
import time
import collections

_CALLBACKS=collections.defaultdict(list)

logger=mp.log_to_stderr(logging.DEBUG)

def attach(name,func):
    _CALLBACKS[name].append(func)

def delegate(func, *args, **kwargs):
    id=kwargs.pop('id')
    try:
        result=func(*args,**kwargs)
    except Exception, err:
        result=err
    return (id,result)

def sleeper(secs):
    assert secs >= 1, 'I need my Augenpflege'
    logger.info('sleeper: will go to sleep for %s secs' % secs)
    time.sleep(secs)
    logger.info('sleeper: woke up - returning result')
    return ['sleeper', 'result']

def callback(r):
    id,result=r
    for func in _CALLBACKS[id]:
        func(result)

def on_sleeper_result(r):
    if isinstance(r, Exception):
        logger.error('on_sleeper_result: got error: %s' % r)
    else:
        logger.info('on_sleeper_result: got result: %s' % r)

if __name__=='__main__':
    pool=mp.Pool()
    pool.apply_async(delegate,args=(sleeper, -3),kwds={'id':1},
                     callback=callback)
    attach(1,on_sleeper_result)
    pool.apply_async(delegate,args=(sleeper, 3),kwds={'id':2},
                     callback=callback)
    attach(2,on_sleeper_result)    
    while 1:
        logger.info('main: loop')
        time.sleep(1)
like image 23
unutbu Avatar answered Oct 27 '22 11:10

unutbu