Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What's a Pythonic way to make a non-blocking version of an object?

I often use python objects with methods that block until finished, and want to convert these methods to non-blocking versions. I find myself executing the following pattern quite frequently:

  1. Define the object
  2. Define a function which creates an instance of the object, and parses commands to call methods of the object
  3. Define a "parent" object which creates a subprocess running the function defined in step 2, and which duplicates the methods of the original object.

This gets the job done, but involves a lot of tedious repetition of code, and doesn't seem very Pythonic to me. Is there a standard, better way to do this?

A highly simplified example to illustrate the pattern I've been using:

import ctypes
import Queue
import multiprocessing as mp

class Hardware:
    def __init__(
        self,
        other_init_args):
        self.dll = ctypes.cll.LoadLibrary('hardware.dll')
        self.dll.Initialize(other_init_args)

    def blocking_command(self, arg_1, arg_2, arg_3):
        """
        This command takes a long time to execute, and blocks while it
        executes. However, while it's executing, we have to coordinate
        other pieces of hardware too, so blocking is bad.
        """
        self.dll.Takes_A_Long_Time(arg_1, arg_2, arg_3)

    def change_settings(self, arg_1, arg_2):
        """
        Realistically, there's tons of other functions in the DLL we
        want to expose as methods. For this example, just one.
        """
        self.dll.Change_Settings(arg_1, arg_2)

    def close(self):
        self.dll.Quit()

def hardware_child_process(
    commands,
    other_init_args):
    hw = Hardware(other_init_args)
    while True:
        cmd, args = commands.recv()
        if cmd == 'start':
            hw.blocking_command(**args)
        elif cmd == 'change_settings':
            hw.change_settings(**args)
        elif cmd == 'quit':
            break
    hw.close()

class Nonblocking_Hardware:
    """
    This class (hopefully) duplicates the functionality of the
    Hardware class, except now Hardware.blocking_command() doesn't
    block other execution.
    """
    def __init__(
        self,
        other_init_args):
        self.commands, self.child_commands = mp.Pipe()
        self.child = mp.Process(
            target=hardware_child_process,
            args=(self.child_commands,
                  other_init_args))
        self.child.start()

    def blocking_command(self, arg_1, arg_2, arg_3):
        """
        Doesn't block any more!
        """
        self.commands.send(
            ('start',
             {'arg_1': arg_1,
              'arg_2': arg_2,
              'arg_3': arg_3}))

    def change_settings(self, arg_1, arg_2):
        self.commands.send(
            ('change_settings',
             {'arg_1': arg_1,
              'arg_2': arg_2}))

    def close(self):
        self.commands.send(('quit', {}))
        self.child.join()
        return None

Backstory:

I use Python to control hardware, usually through closed-source DLLs that I call using ctypes. Frequently, I end up wanting to call functions from the DLL which block until execution finishes, but I don't want my control code to block. For example, I might be synchronizing a camera with illumination using an analog-out card. The camera DLL "snap" function must be called before the analog-out card can send a trigger pulse to the camera, but the "snap" command blocks, preventing me from activating the analog-out card.

like image 922
Andrew Avatar asked Sep 30 '22 19:09

Andrew


2 Answers

I've done something similar by using a metaclass to create non-blocking versions of blocking functions on the object. It allows you to create a non-blocking version of a class just by doing this:

class NB_Hardware(object):
    __metaclass__ = NonBlockBuilder
    delegate = Hardware
    nb_funcs = ['blocking_command']

I've taken my original implementation, which targeted Python 3 and used a concurrent.futures.ThreadPoolExecutor (I was wrapping blocking I/O calls to make them non-blocking in an asyncio context*), and adapted them to use Python 2 and a concurrent.futures.ProcessPoolExecutor. Here's the implementation of the metaclass along with its helper classes:

from multiprocessing import cpu_count
from concurrent.futures import ProcessPoolExecutor

def runner(self, cb, *args, **kwargs):
    return getattr(self, cb)(*args, **kwargs)

class _ExecutorMixin():
    """ A Mixin that provides asynchronous functionality.

    This mixin provides methods that allow a class to run
    blocking methods in a ProcessPoolExecutor.
    It also provides methods that attempt to keep the object
    picklable despite having a non-picklable ProcessPoolExecutor
    as part of its state.

    """
    pool_workers = cpu_count()

    def run_in_executor(self, callback, *args, **kwargs):
        """  Runs a function in an Executor.

        Returns a concurrent.Futures.Future

        """
        if not hasattr(self, '_executor'):
            self._executor = self._get_executor()

        return self._executor.submit(runner, self, callback, *args, **kwargs)

    def _get_executor(self):
        return ProcessPoolExecutor(max_workers=self.pool_workers)

    def __getattr__(self, attr):
        if (self._obj and hasattr(self._obj, attr) and
            not attr.startswith("__")):
            return getattr(self._obj, attr)
        raise AttributeError(attr)

    def __getstate__(self):
        self_dict = self.__dict__
        self_dict['_executor'] = None
        return self_dict

    def __setstate__(self, state):
        self.__dict__.update(state)
        self._executor = self._get_executor()

class NonBlockBuilder(type):
    """ Metaclass for adding non-blocking versions of methods to a class.  

    Expects to find the following class attributes:
    nb_funcs - A list containing methods that need non-blocking wrappers
    delegate - The class to wrap (add non-blocking methods to)
    pool_workers - (optional) how many workers to put in the internal pool.

    The metaclass inserts a mixin (_ExecutorMixin) into the inheritence
    hierarchy of cls. This mixin provides methods that allow
    the non-blocking wrappers to do their work.

    """
    def __new__(cls, clsname, bases, dct, **kwargs):
        nbfunc_list = dct.get('nb_funcs', [])
        existing_nbfuncs = set()

        def find_existing_nbfuncs(d):
            for attr in d:
                if attr.startswith("nb_"):
                    existing_nbfuncs.add(attr)

        # Determine if any bases include the nb_funcs attribute, or
        # if either this class or a base class provides an actual
        # implementation for a non-blocking method.
        find_existing_nbfuncs(dct)
        for b in bases:
            b_dct = b.__dict__
            nbfunc_list.extend(b_dct.get('nb_funcs', []))
            find_existing_nbfuncs(b_dct)

        # Add _ExecutorMixin to bases.
        if _ExecutorMixin not in bases:
            bases += (_ExecutorMixin,)

        # Add non-blocking funcs to dct, but only if a definition
        # is not already provided by dct or one of our bases.
        for func in nbfunc_list:
            nb_name = 'nb_{}'.format(func)
            if nb_name not in existing_nbfuncs:
                dct[nb_name] = cls.nbfunc_maker(func)

        return super(NonBlockBuilder, cls).__new__(cls, clsname, bases, dct)

    def __init__(cls, name, bases, dct):
        """ Properly initialize a non-blocking wrapper.

        Sets pool_workers and delegate on the class, and also
        adds an __init__ method to it that instantiates the
        delegate with the proper context.

        """
        super(NonBlockBuilder, cls).__init__(name, bases, dct)
        pool_workers = dct.get('pool_workers')
        delegate = dct.get('delegate')
        old_init = dct.get('__init__')
        # Search bases for values we care about, if we didn't
        # find them on the child class.
        for b in bases:
            if b is object:  # Skip object
                continue
            b_dct = b.__dict__
            if not pool_workers:
                pool_workers = b_dct.get('pool_workers')
            if not delegate:
                delegate = b_dct.get('delegate')
            if not old_init:
                old_init = b_dct.get('__init__')

        cls.delegate = delegate

        # If we found a value for pool_workers, set it. If not,
        # ExecutorMixin sets a default that will be used.
        if pool_workers:
            cls.pool_workers = pool_workers

        # Here's the __init__ we want every wrapper class to use.
        # It just instantiates the delegate object.
        def init_func(self, *args, **kwargs):
            # Be sure to call the original __init__, if there
            # was one.
            if old_init:
                old_init(self, *args, **kwargs)

            if self.delegate:
                self._obj = self.delegate(*args, **kwargs)
        cls.__init__ = init_func

    @staticmethod
    def nbfunc_maker(func):
        def nb_func(self, *args, **kwargs):
            return self.run_in_executor(func, *args, **kwargs)
        return nb_func

Usage:

from nb_helper import NonBlockBuilder
import time


class Hardware:
    def __init__(self, other_init_args):
        self.other = other_init_args

    def blocking_command(self, arg_1, arg_2, arg_3):
        print("start blocking")
        time.sleep(5)
        return "blocking"

    def normal_command(self):
        return "normal"


class NBHardware(object):
    __metaclass__ = NonBlockBuilder
    delegate = Hardware
    nb_funcs = ['blocking_command']


if __name__ == "__main__":
    h = NBHardware("abc")
    print "doing blocking call"
    print h.blocking_command(1,2,3)
    print "done"
    print "doing non-block call"
    x = h.nb_blocking_command(1,2,3)  # This is non-blocking and returns concurrent.future.Future
    print h.normal_command()  # You can still use the normal functions, too.
    print x.result()  # Waits for the result from the Future

Output:

doing blocking call
start blocking
< 5 second delay >
blocking
done
doing non-block call
start blocking
normal
< 5 second delay >
blocking

The one tricky piece for you is making sure Hardware is picklable. You can probably do that by making __getstate__ delete the dll object, and recreate it in __setstate__, similar to what _ExecutorMixin does.

You'll also need the Python 2.x backport of concurrent.futures.

Note that there's a bunch of complexity in the metaclass so that they'll work properly with inheritance, and support things like providing custom implementations of __init__ and the nb_* methods. For example, something like this is supported:

class AioBaseLock(object):
    __metaclass__ = NonBlockBuilder
    pool_workers = 1
    coroutines = ['acquire', 'release']

def __init__(self, *args, **kwargs):
    self._threaded_acquire = False
    def _after_fork(obj):
        obj._threaded_acquire = False
    register_after_fork(self, _after_fork)

def coro_acquire(self, *args, **kwargs):
    def lock_acquired(fut):
        if fut.result():
            self._threaded_acquire = True

    out = self.run_in_executor(self._obj.acquire, *args, **kwargs)
    out.add_done_callback(lock_acquired)
    return out

class AioLock(AioBaseLock):
    delegate = Lock


class AioRLock(AioBaseLock):
    delegate = RLock

If you don't need that kind of flexibility, you can simplify the implementation quite a bit:

class NonBlockBuilder(type):
    """ Metaclass for adding non-blocking versions of methods to a class.  

    Expects to find the following class attributes:
    nb_funcs - A list containing methods that need non-blocking wrappers
    delegate - The class to wrap (add non-blocking methods to)
    pool_workers - (optional) how many workers to put in the internal pool.

    The metaclass inserts a mixin (_ExecutorMixin) into the inheritence
    hierarchy of cls. This mixin provides methods that allow
    the non-blocking wrappers to do their work.

    """
    def __new__(cls, clsname, bases, dct, **kwargs):
        nbfunc_list = dct.get('nb_funcs', [])

        # Add _ExecutorMixin to bases.
        if _ExecutorMixin not in bases:
            bases += (_ExecutorMixin,)

        # Add non-blocking funcs to dct, but only if a definition
        # is not already provided by dct or one of our bases.
        for func in nbfunc_list:
            nb_name = 'nb_{}'.format(func)
            dct[nb_name] = cls.nbfunc_maker(func)

        return super(NonBlockBuilder, cls).__new__(cls, clsname, bases, dct)

    def __init__(cls, name, bases, dct):
        """ Properly initialize a non-blocking wrapper.

        Sets pool_workers and delegate on the class, and also
        adds an __init__ method to it that instantiates the
        delegate with the proper context.

        """
        super(NonBlockBuilder, cls).__init__(name, bases, dct)
        pool_workers = dct.get('pool_workers')
        cls.delegate = dct['delegate']

        # If we found a value for pool_workers, set it. If not,
        # ExecutorMixin sets a default that will be used.
        if pool_workers:
            cls.pool_workers = pool_workers

        # Here's the __init__ we want every wrapper class to use.
        # It just instantiates the delegate object.
        def init_func(self, *args, **kwargs):
            self._obj = self.delegate(*args, **kwargs)
        cls.__init__ = init_func

    @staticmethod
    def nbfunc_maker(func):
        def nb_func(self, *args, **kwargs):
            return self.run_in_executor(func, *args, **kwargs)
        return nb_func

* The original code is here, for reference.

like image 72
dano Avatar answered Oct 03 '22 01:10

dano


One method I've used to launch class methods asynchronously is to create a pool and call a few function aliases with apply_async instead of directly calling the class methods.

Say you have an even simpler version of your classes:

class Hardware:
    def __init__(self, stuff):
        self.stuff = stuff
        return

    def blocking_command(self, arg1):
        self.stuff.call_function(arg1)
        return

At the top level of your module, define a new function that looks like this:

def _blocking_command(Hardware_obj, arg1):
    return Hardware_obj.blocking_command(Hardware_obj, arg1)

Since the class and this "alias" function are both defined at the top level of the module, they are pickleable and you can kick it off using the multiprocessing library:

import multiprocessing

hw_obj = Harware(stuff)
pool = multiprocessing.Pool()

results_obj = pool.apply_async(_blocking_command, (hw_obj, arg1))

The results of your function calls will be available in the results object. I like this approach because it uses a relatively small amount of code to make parallelization a lot easier. Specifically, it only adds a few two-line functions instead of any classes, and there are no extra imports required besides multiprocessing.

Notes:

  1. Don't use this for methods that need to modify the objects attributes, but it works fine if it is used after all of the class' attributes have been set, effectively treating the class attributes as "read-only".

  2. You can also use this approach inside a class method to launch other class methods, you just have to explicitly pass "self". This could allow you to move your floating "hardware_child_process" function into the class. it would still act as a dispatcher of a bunch of asynchronous processes, but it would centralize that functionality in your Hardware class.

like image 36
skrrgwasme Avatar answered Oct 03 '22 01:10

skrrgwasme