Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Share async-await coroutine based complex object across multiprocess

I know generally, objects should not be shared among multiprocess and the issues that can arise from it. But my requirement is such that it's necessary to do this.

I have a complex object with all the nice coroutines async-await in it. A function that runs a long-running process on this object in a separate process of its own. Now, I want to run an IPython shell in the main process and operate on this complex object while that long running process is running in another process.

To share this complex object across process, I have tried multiprocessing BaseManager approach that I came across on SO:

import multiprocessing
import multiprocessing.managers as m


class MyManager(m.BaseManager):
    pass

MyManager.register('complex_asynio_based_class', complex_asynio_based_class)
manager = MyManager()
manager.start()
c = manager.complex_asynio_based_class()

process = multiprocessing.Process(
     target=long_running_process,
     args=(c,),
)

but this gives error:

Unserializable message: Traceback (most recent call last):
  File "/usr/3.6/lib/python3.6/multiprocessing/managers.py", line 283, in serve_client
    send(msg)
  File "/usr/3.6/lib/python3.6/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/3.6/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: can't pickle coroutine objects

It's not working as there are coroutines in the object. I am not able to think of a better solution to get it to work and I am stuck on it.

If it was not Python I would have spawned a thread for long running process and still be able to operate on it.

If I am not wrong, this should be a common pattern for multiprocess applications to run a background process and the main process that just do some read-only operation on it, like in my case, and not modify it. I want to know how it is done generally?

How complex objects that can't be picked are shared across multiprocess?

like image 362
Amit Tripathi Avatar asked Feb 01 '18 04:02

Amit Tripathi


1 Answers

Running coroutines cannot be automatically shared among processes because a coroutine runs inside a particular event loop in the process that owns the async class. The coroutine has state that cannot be pickled and even if it could, it wouldn't make sense outside the context of its event loop.

What you can do is create a callback-based adapter for your async class, with each coroutine method being represented by a callback-based method with the semantics of "start doing X and call this function when done". Provided the callback is multiprocessing-aware, these operations can be called from other processes. You could then start an event loop in each process and create a coroutine facade over the proxied callback-based calls.

For example, consider a trivial async class:

class Async:
    async def repeat(self, n, s):
        for i in range(n):
            print(s, i, os.getpid())
            await asyncio.sleep(.2)
        return s

A callback-based adapter can use the public asyncio API to convert the repeat coroutine into a classic asynchronous function in the JavaScript "callback hell" style:

class CallbackAdapter:
    def repeat_start(self, n, s, on_success):
        fut = asyncio.run_coroutine_threadsafe(
            self._async.repeat(n, s), self._loop)
        # Once the coroutine is done, notify the caller.
        fut.add_done_callback(lambda _f: on_success(fut.result()))

(The conversion can be automated, the above manually written code just shows the concept.)

CallbackAdapter can be registered with multiprocessing, so different processes can start the adapter's method (and hence the original async coroutine) through multiprocessing-provided proxies. This only requires that the callback passed as on_success be multiprocessing-friendly.

As a final step, one could go full circle and create an async adapter for the callback-based API (!), start an event loop in the other process as well, and also make use of asyncio and async def. This adapter-for-adapter class would sport a fully functional repeat coroutine that is effectively proxying the original Async.repeat coroutine without ever trying to pickle coroutine state.

Here is a sample implementation of the above approach:

import asyncio, multiprocessing.managers, threading, os

class Async:
    # The async class we are bridging.  This class is unaware of multiprocessing
    # or of any of the code that follows.
    async def repeat(self, n, s):
        for i in range(n):
            print(s, i, 'pid', os.getpid())
            await asyncio.sleep(.2)
        return s


def start_asyncio_thread():
    # Since the manager controls the main thread, we have to spin up the event
    # loop in a dedicated thread and use asyncio.run_coroutine_threadsafe to
    # submit stuff to the loop.
    setup_done = threading.Event()
    loop = None
    def loop_thread():
        nonlocal loop
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        setup_done.set()
        loop.run_forever()
    threading.Thread(target=loop_thread).start()
    setup_done.wait()
    return loop

class CallbackAdapter:
    _loop = None

    # the callback adapter to the async class, also running in the
    # worker process
    def __init__(self, obj):
        self._async = obj
        if CallbackAdapter._loop is None:
            CallbackAdapter._loop = start_asyncio_thread()

    def repeat_start(self, n, s, on_success):
        # Submit a coroutine to the event loop and obtain a Task/Future.  This
        # is normally done with loop.create_task, but repeat_start will be
        # called from the main thread, owned by the multiprocessng manager,
        # while the event loop will run in a separate thread.
        future = asyncio.run_coroutine_threadsafe(
            self._async.repeat(n, s), self._loop)
        # Once the coroutine is done, notify the caller.
        # We could propagate exceptions by accepting an additional on_error
        # callback, and nesting fut.result() in a try/except that decides
        # whether to call on_success or on_error.
        future.add_done_callback(lambda _f: on_success(future.result()))


def remote_event_future(manager):
    # Return a function/future pair that can be used to locally monitor an
    # event in another process.
    #
    # The returned function and future have the following property: when the
    # function is invoked, possibly in another process, the future completes.
    # The function can be passed as a callback argument to a multiprocessing
    # proxy object and therefore invoked by a different process.
    loop = asyncio.get_event_loop()
    result_pipe = manager.Queue()
    future = loop.create_future()
    def _wait_for_remote():
        result = result_pipe.get()
        loop.call_soon_threadsafe(future.set_result, result)
    t = threading.Thread(target=_wait_for_remote)
    t.start()
    return result_pipe.put, future


class AsyncAdapter:
    # The async adapter for a callback-based API, e.g. the CallbackAdapter.
    # Designed to run in a different process and communicate to the callback
    # adapter via a multiprocessing proxy.
    def __init__(self, cb_proxy, manager):
        self._cb = cb_proxy
        self._manager = manager

    async def repeat(self, n, s):
        set_result, future = remote_event_future(self._manager)
        self._cb.repeat_start(n, s, set_result)
        return await future


class CommManager(multiprocessing.managers.SyncManager):
    pass

CommManager.register('Async', Async)
CommManager.register('CallbackAdapter', CallbackAdapter)


def get_manager():
    manager = CommManager()
    manager.start()
    return manager

def other_process(manager, cb_proxy):
    print('other_process (pid %d)' % os.getpid())
    aadapt = AsyncAdapter(cb_proxy, manager)
    loop = asyncio.get_event_loop()
    # Create two coroutines printing different messages, and gather their
    # results.
    results = loop.run_until_complete(asyncio.gather(
        aadapt.repeat(3, 'message A'),
        aadapt.repeat(2, 'message B')))
    print('coroutine results (pid %d): %s' % (os.getpid(), results))
    print('other_process (pid %d) done' % os.getpid())

def start_other_process(loop, manager, async_proxy):
    cb_proxy = manager.CallbackAdapter(async_proxy)
    other = multiprocessing.Process(target=other_process,
                                    args=(manager, cb_proxy,))
    other.start()
    return other

def main():
    loop = asyncio.get_event_loop()
    manager = get_manager()
    async_proxy = manager.Async()
    # Create two external processes that drive coroutines in our event loop.
    # Note that all messages are printed with the same PID.
    start_other_process(loop, manager, async_proxy)
    start_other_process(loop, manager, async_proxy)
    loop.run_forever()

if __name__ == '__main__':
    main()

The code runs correctly on Python 3.5, but fails on 3.6 and 3.7 due to a bug in multiprocessing.

like image 161
user4815162342 Avatar answered Nov 02 '22 03:11

user4815162342