I have a GUI application that needs to fetch and parse diverse resources from network beside the GUI main loop. I've searched for options using the python multiprocessing module since these fetch actions not only contain blocking IO but also contain heavy parsing, so multiprocessing may be better option here than python threads. It would be easy using Twisted but this time Twisted is not a option.
I found a simple solution here:
Python subprocess: callback when cmd exits
The problem is that the callback magically isn't called within in the MainThread.
So I come up with the following solution:
delegate.py
import os
import multiprocessing as mp
import signal
from collections import namedtuple
import uuid
import logging
_CALLBACKS = {}
_QUEUE = mp.Queue()
info = logging.getLogger(__name__).info
class Call(namedtuple('Call', 'id finished result error')):
def attach(self, func):
if not self.finished:
_CALLBACKS.setdefault(self.id, []).append(func)
else:
func(self.result or self.error)
return self
def callback(self):
assert self.finished, 'Call not finished yet'
r = self.result or self.error
for func in _CALLBACKS.pop(self.id, []):
func(r)
def done(self, result=None, error=None):
assert not self.finished, 'Call already finished'
return self._replace(finished=(-1 if error else 1),
result=result, error=error)
@classmethod
def create(clss):
call = clss(uuid.uuid4().hex, 0, None, None) # uuid ???
return call
def run(q, cb, func, args=None, kwargs=None):
info('run: try running %s' % func)
try:
cb = cb.done(result=func(*(args or ()), **(kwargs or {})))
except Exception, err:
cb = cb.done(error=err)
q.put(cb)
os.kill(os.getppid(), signal.SIGUSR2) # SIGUSR2 ???
info('run: leaving')
def on_callback(sig, frame):
info('on_callback: checking queue ...')
c = _QUEUE.get(True, 2)
info('on_callback: got call - %s' % repr(c))
c.callback()
signal.signal(signal.SIGUSR2, on_callback) # SIGUSR2 ???
def delegate(func, *args, **kwargs):
info('delegate: %s %s' % (func, args,))
cb = Call.create()
mp.Process(target=run, args=(_QUEUE, cb, func, args, kwargs,)).start()
return cb
__all__ = ['delegate']
usage
from delegate import delegate
def sleeper(secs):
assert secs >= 1, 'I need my Augenpflege'
info('sleeper: will go to sleep for %s secs' % secs)
sleep(secs)
info('sleeper: woke up - returning result')
return ['sleeper', 'result']
def on_sleeper_result(r):
if isinstance(r, Exception):
info('on_sleeper_result: got error: %s' % r)
else:
info('on_sleeper_result: got result: %s' % r)
from delegate import delegate
delegate(sleeper, 3).attach(on_sleeper_result)
delegate(sleeper, -3).attach(on_sleeper_result)
while 1:
info('main: loop')
sleep(1)
output
0122 08432 MainThread INFO delegate: <function sleeper at 0x163e320> (3,)
0123 08432 MainThread INFO delegate: <function sleeper at 0x163e320> (-3,)
0124 08437 MainThread INFO run: try running <function sleeper at 0x163e320>
0124 08437 MainThread INFO sleeper: will go to sleep for 3 secs
0124 08432 MainThread INFO main: loop
0125 08438 MainThread INFO run: try running <function sleeper at 0x163e320>
0126 08438 MainThread INFO run: leaving
0126 08432 MainThread INFO on_callback: checking queue ...
0126 08432 MainThread INFO on_callback: got call - Call(id='057649cba7d840e3825aa5ac73248f78', finished=-1, result=None, error=AssertionError('I need my Augenpflege',))
0127 08432 MainThread INFO on_sleeper_result: got error: I need my Augenpflege
0127 08432 MainThread INFO main: loop
1128 08432 MainThread INFO main: loop
2129 08432 MainThread INFO main: loop
3127 08437 MainThread INFO sleeper: woke up - returning result
3128 08437 MainThread INFO run: leaving
3128 08432 MainThread INFO on_callback: checking queue ...
3129 08432 MainThread INFO on_callback: got call - Call(id='041420c6c83a489aa5c7409c662d4917', finished=1, result=['sleeper', 'result'], error=None)
3129 08432 MainThread INFO on_sleeper_result: got result: ['sleeper', 'result']
3129 08432 MainThread INFO main: loop
4130 08432 MainThread INFO main: loop
5132 08432 MainThread INFO main: loop
...
So far this works pretty well, but my experience with the multiprocessing module are moderate and I'm a bit unsure if this will run without effects. My question is - what are the things I should especially care of while using multiprocessing in such a way ... or are there 'more correct' patterns for a asynchronous callback mechanism using the python standard lib?
You have no reason to use signals (low level api) for python multiprocessing and busy waiting in the main loop.
You have to run your (modified) event loop in a QThread
, that can directly call qt code, or use QApplication.postEvent (or pyqtSignal) to execute it in the main thread
# this should be in the delegate module
while 1:
c = _QUEUE.get(True) # no timeout
c.callback() # or post event to main thread
You can also see this page for discussion on communicating between threads in qt
Your code works, but it isn't as simple as it could be. Let's step through the code.
This creates a Call
instance in the main process:
def delegate(func, *args, **kwargs):
cb = Call.create()
but when you pass cb
to the worker process,
mp.Process(target=run, args=(_QUEUE, cb, func, args, kwargs,)).start()
the Call
instance is copied during the os.fork
ing, thus creating a second, separate instance. It then
calls cb.done
and which calls cb._replace
which returns a third Call
instance:
def done(self, result=None, error=None):
assert not self.finished, 'Call already finished'
return self._replace(finished=(-1 if error else 1),
result=result, error=error)
The above calls the private namedtuple method _replace
. It could have been a
simple Python statements like
self.finished = -1 if error else 1
if Call
were a subclass of object
instead of a subclass of namedtuple
. Subclassing namedtuple
saved a bit of typing in __init__
but it becomes quite clumsy later on since we need to modify the namedtuple
's attributes...
Meanwhile, the original Call
instance returned by delegate(...)
in the main process calls attach
:
delegate(...).attach(on_sleeper_result)
This modifies the global _CALLBACKS
dict. The worker processes have no way of knowing about this change in _CALLBACKS
; in the worker processes _CALLBACKS
is still the empty dict. So you have to pass the worker Call
instance back to the main process through an mp.Queue
, which uses the cb.id
to reference the right functions in _CALLBACKS
.
So it all works, but it creates three Call
instances for every call to delegate
, and the code might mislead the uninitiated into thinking the three Call
instances are all the same object.... It all works out, but it's kind of complicated.
Have you considered using mp.Pool.apply_async
's callback
parameter instead?
import multiprocessing as mp
import logging
import time
import collections
_CALLBACKS=collections.defaultdict(list)
logger=mp.log_to_stderr(logging.DEBUG)
def attach(name,func):
_CALLBACKS[name].append(func)
def delegate(func, *args, **kwargs):
id=kwargs.pop('id')
try:
result=func(*args,**kwargs)
except Exception, err:
result=err
return (id,result)
def sleeper(secs):
assert secs >= 1, 'I need my Augenpflege'
logger.info('sleeper: will go to sleep for %s secs' % secs)
time.sleep(secs)
logger.info('sleeper: woke up - returning result')
return ['sleeper', 'result']
def callback(r):
id,result=r
for func in _CALLBACKS[id]:
func(result)
def on_sleeper_result(r):
if isinstance(r, Exception):
logger.error('on_sleeper_result: got error: %s' % r)
else:
logger.info('on_sleeper_result: got result: %s' % r)
if __name__=='__main__':
pool=mp.Pool()
pool.apply_async(delegate,args=(sleeper, -3),kwds={'id':1},
callback=callback)
attach(1,on_sleeper_result)
pool.apply_async(delegate,args=(sleeper, 3),kwds={'id':2},
callback=callback)
attach(2,on_sleeper_result)
while 1:
logger.info('main: loop')
time.sleep(1)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With