I often use python objects with methods that block until finished, and want to convert these methods to non-blocking versions. I find myself executing the following pattern quite frequently:
This gets the job done, but involves a lot of tedious repetition of code, and doesn't seem very Pythonic to me. Is there a standard, better way to do this?
A highly simplified example to illustrate the pattern I've been using:
import ctypes
import Queue
import multiprocessing as mp
class Hardware:
def __init__(
self,
other_init_args):
self.dll = ctypes.cll.LoadLibrary('hardware.dll')
self.dll.Initialize(other_init_args)
def blocking_command(self, arg_1, arg_2, arg_3):
"""
This command takes a long time to execute, and blocks while it
executes. However, while it's executing, we have to coordinate
other pieces of hardware too, so blocking is bad.
"""
self.dll.Takes_A_Long_Time(arg_1, arg_2, arg_3)
def change_settings(self, arg_1, arg_2):
"""
Realistically, there's tons of other functions in the DLL we
want to expose as methods. For this example, just one.
"""
self.dll.Change_Settings(arg_1, arg_2)
def close(self):
self.dll.Quit()
def hardware_child_process(
commands,
other_init_args):
hw = Hardware(other_init_args)
while True:
cmd, args = commands.recv()
if cmd == 'start':
hw.blocking_command(**args)
elif cmd == 'change_settings':
hw.change_settings(**args)
elif cmd == 'quit':
break
hw.close()
class Nonblocking_Hardware:
"""
This class (hopefully) duplicates the functionality of the
Hardware class, except now Hardware.blocking_command() doesn't
block other execution.
"""
def __init__(
self,
other_init_args):
self.commands, self.child_commands = mp.Pipe()
self.child = mp.Process(
target=hardware_child_process,
args=(self.child_commands,
other_init_args))
self.child.start()
def blocking_command(self, arg_1, arg_2, arg_3):
"""
Doesn't block any more!
"""
self.commands.send(
('start',
{'arg_1': arg_1,
'arg_2': arg_2,
'arg_3': arg_3}))
def change_settings(self, arg_1, arg_2):
self.commands.send(
('change_settings',
{'arg_1': arg_1,
'arg_2': arg_2}))
def close(self):
self.commands.send(('quit', {}))
self.child.join()
return None
Backstory:
I use Python to control hardware, usually through closed-source DLLs that I call using ctypes. Frequently, I end up wanting to call functions from the DLL which block until execution finishes, but I don't want my control code to block. For example, I might be synchronizing a camera with illumination using an analog-out card. The camera DLL "snap" function must be called before the analog-out card can send a trigger pulse to the camera, but the "snap" command blocks, preventing me from activating the analog-out card.
I've done something similar by using a metaclass to create non-blocking versions of blocking functions on the object. It allows you to create a non-blocking version of a class just by doing this:
class NB_Hardware(object):
__metaclass__ = NonBlockBuilder
delegate = Hardware
nb_funcs = ['blocking_command']
I've taken my original implementation, which targeted Python 3 and used a concurrent.futures.ThreadPoolExecutor
(I was wrapping blocking I/O calls to make them non-blocking in an asyncio
context*), and adapted them to use Python 2 and a concurrent.futures.ProcessPoolExecutor
. Here's the implementation of the metaclass along with its helper classes:
from multiprocessing import cpu_count
from concurrent.futures import ProcessPoolExecutor
def runner(self, cb, *args, **kwargs):
return getattr(self, cb)(*args, **kwargs)
class _ExecutorMixin():
""" A Mixin that provides asynchronous functionality.
This mixin provides methods that allow a class to run
blocking methods in a ProcessPoolExecutor.
It also provides methods that attempt to keep the object
picklable despite having a non-picklable ProcessPoolExecutor
as part of its state.
"""
pool_workers = cpu_count()
def run_in_executor(self, callback, *args, **kwargs):
""" Runs a function in an Executor.
Returns a concurrent.Futures.Future
"""
if not hasattr(self, '_executor'):
self._executor = self._get_executor()
return self._executor.submit(runner, self, callback, *args, **kwargs)
def _get_executor(self):
return ProcessPoolExecutor(max_workers=self.pool_workers)
def __getattr__(self, attr):
if (self._obj and hasattr(self._obj, attr) and
not attr.startswith("__")):
return getattr(self._obj, attr)
raise AttributeError(attr)
def __getstate__(self):
self_dict = self.__dict__
self_dict['_executor'] = None
return self_dict
def __setstate__(self, state):
self.__dict__.update(state)
self._executor = self._get_executor()
class NonBlockBuilder(type):
""" Metaclass for adding non-blocking versions of methods to a class.
Expects to find the following class attributes:
nb_funcs - A list containing methods that need non-blocking wrappers
delegate - The class to wrap (add non-blocking methods to)
pool_workers - (optional) how many workers to put in the internal pool.
The metaclass inserts a mixin (_ExecutorMixin) into the inheritence
hierarchy of cls. This mixin provides methods that allow
the non-blocking wrappers to do their work.
"""
def __new__(cls, clsname, bases, dct, **kwargs):
nbfunc_list = dct.get('nb_funcs', [])
existing_nbfuncs = set()
def find_existing_nbfuncs(d):
for attr in d:
if attr.startswith("nb_"):
existing_nbfuncs.add(attr)
# Determine if any bases include the nb_funcs attribute, or
# if either this class or a base class provides an actual
# implementation for a non-blocking method.
find_existing_nbfuncs(dct)
for b in bases:
b_dct = b.__dict__
nbfunc_list.extend(b_dct.get('nb_funcs', []))
find_existing_nbfuncs(b_dct)
# Add _ExecutorMixin to bases.
if _ExecutorMixin not in bases:
bases += (_ExecutorMixin,)
# Add non-blocking funcs to dct, but only if a definition
# is not already provided by dct or one of our bases.
for func in nbfunc_list:
nb_name = 'nb_{}'.format(func)
if nb_name not in existing_nbfuncs:
dct[nb_name] = cls.nbfunc_maker(func)
return super(NonBlockBuilder, cls).__new__(cls, clsname, bases, dct)
def __init__(cls, name, bases, dct):
""" Properly initialize a non-blocking wrapper.
Sets pool_workers and delegate on the class, and also
adds an __init__ method to it that instantiates the
delegate with the proper context.
"""
super(NonBlockBuilder, cls).__init__(name, bases, dct)
pool_workers = dct.get('pool_workers')
delegate = dct.get('delegate')
old_init = dct.get('__init__')
# Search bases for values we care about, if we didn't
# find them on the child class.
for b in bases:
if b is object: # Skip object
continue
b_dct = b.__dict__
if not pool_workers:
pool_workers = b_dct.get('pool_workers')
if not delegate:
delegate = b_dct.get('delegate')
if not old_init:
old_init = b_dct.get('__init__')
cls.delegate = delegate
# If we found a value for pool_workers, set it. If not,
# ExecutorMixin sets a default that will be used.
if pool_workers:
cls.pool_workers = pool_workers
# Here's the __init__ we want every wrapper class to use.
# It just instantiates the delegate object.
def init_func(self, *args, **kwargs):
# Be sure to call the original __init__, if there
# was one.
if old_init:
old_init(self, *args, **kwargs)
if self.delegate:
self._obj = self.delegate(*args, **kwargs)
cls.__init__ = init_func
@staticmethod
def nbfunc_maker(func):
def nb_func(self, *args, **kwargs):
return self.run_in_executor(func, *args, **kwargs)
return nb_func
Usage:
from nb_helper import NonBlockBuilder
import time
class Hardware:
def __init__(self, other_init_args):
self.other = other_init_args
def blocking_command(self, arg_1, arg_2, arg_3):
print("start blocking")
time.sleep(5)
return "blocking"
def normal_command(self):
return "normal"
class NBHardware(object):
__metaclass__ = NonBlockBuilder
delegate = Hardware
nb_funcs = ['blocking_command']
if __name__ == "__main__":
h = NBHardware("abc")
print "doing blocking call"
print h.blocking_command(1,2,3)
print "done"
print "doing non-block call"
x = h.nb_blocking_command(1,2,3) # This is non-blocking and returns concurrent.future.Future
print h.normal_command() # You can still use the normal functions, too.
print x.result() # Waits for the result from the Future
Output:
doing blocking call
start blocking
< 5 second delay >
blocking
done
doing non-block call
start blocking
normal
< 5 second delay >
blocking
The one tricky piece for you is making sure Hardware
is picklable. You can probably do that by making __getstate__
delete the dll
object, and recreate it in __setstate__
, similar to what _ExecutorMixin
does.
You'll also need the Python 2.x backport of concurrent.futures
.
Note that there's a bunch of complexity in the metaclass so that they'll work properly with inheritance, and support things like providing custom implementations of __init__
and the nb_*
methods. For example, something like this is supported:
class AioBaseLock(object):
__metaclass__ = NonBlockBuilder
pool_workers = 1
coroutines = ['acquire', 'release']
def __init__(self, *args, **kwargs):
self._threaded_acquire = False
def _after_fork(obj):
obj._threaded_acquire = False
register_after_fork(self, _after_fork)
def coro_acquire(self, *args, **kwargs):
def lock_acquired(fut):
if fut.result():
self._threaded_acquire = True
out = self.run_in_executor(self._obj.acquire, *args, **kwargs)
out.add_done_callback(lock_acquired)
return out
class AioLock(AioBaseLock):
delegate = Lock
class AioRLock(AioBaseLock):
delegate = RLock
If you don't need that kind of flexibility, you can simplify the implementation quite a bit:
class NonBlockBuilder(type):
""" Metaclass for adding non-blocking versions of methods to a class.
Expects to find the following class attributes:
nb_funcs - A list containing methods that need non-blocking wrappers
delegate - The class to wrap (add non-blocking methods to)
pool_workers - (optional) how many workers to put in the internal pool.
The metaclass inserts a mixin (_ExecutorMixin) into the inheritence
hierarchy of cls. This mixin provides methods that allow
the non-blocking wrappers to do their work.
"""
def __new__(cls, clsname, bases, dct, **kwargs):
nbfunc_list = dct.get('nb_funcs', [])
# Add _ExecutorMixin to bases.
if _ExecutorMixin not in bases:
bases += (_ExecutorMixin,)
# Add non-blocking funcs to dct, but only if a definition
# is not already provided by dct or one of our bases.
for func in nbfunc_list:
nb_name = 'nb_{}'.format(func)
dct[nb_name] = cls.nbfunc_maker(func)
return super(NonBlockBuilder, cls).__new__(cls, clsname, bases, dct)
def __init__(cls, name, bases, dct):
""" Properly initialize a non-blocking wrapper.
Sets pool_workers and delegate on the class, and also
adds an __init__ method to it that instantiates the
delegate with the proper context.
"""
super(NonBlockBuilder, cls).__init__(name, bases, dct)
pool_workers = dct.get('pool_workers')
cls.delegate = dct['delegate']
# If we found a value for pool_workers, set it. If not,
# ExecutorMixin sets a default that will be used.
if pool_workers:
cls.pool_workers = pool_workers
# Here's the __init__ we want every wrapper class to use.
# It just instantiates the delegate object.
def init_func(self, *args, **kwargs):
self._obj = self.delegate(*args, **kwargs)
cls.__init__ = init_func
@staticmethod
def nbfunc_maker(func):
def nb_func(self, *args, **kwargs):
return self.run_in_executor(func, *args, **kwargs)
return nb_func
* The original code is here, for reference.
One method I've used to launch class methods asynchronously is to create a pool and call a few function aliases with apply_async instead of directly calling the class methods.
Say you have an even simpler version of your classes:
class Hardware:
def __init__(self, stuff):
self.stuff = stuff
return
def blocking_command(self, arg1):
self.stuff.call_function(arg1)
return
At the top level of your module, define a new function that looks like this:
def _blocking_command(Hardware_obj, arg1):
return Hardware_obj.blocking_command(Hardware_obj, arg1)
Since the class and this "alias" function are both defined at the top level of the module, they are pickleable and you can kick it off using the multiprocessing library:
import multiprocessing
hw_obj = Harware(stuff)
pool = multiprocessing.Pool()
results_obj = pool.apply_async(_blocking_command, (hw_obj, arg1))
The results of your function calls will be available in the results object. I like this approach because it uses a relatively small amount of code to make parallelization a lot easier. Specifically, it only adds a few two-line functions instead of any classes, and there are no extra imports required besides multiprocessing.
Notes:
Don't use this for methods that need to modify the objects attributes, but it works fine if it is used after all of the class' attributes have been set, effectively treating the class attributes as "read-only".
You can also use this approach inside a class method to launch other class methods, you just have to explicitly pass "self". This could allow you to move your floating "hardware_child_process" function into the class. it would still act as a dispatcher of a bunch of asynchronous processes, but it would centralize that functionality in your Hardware class.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With