Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can't pickle coroutine objects when ProcessPoolExecutor is used in class

I'm trying to get asyncio work with subprocesses and limitations. I've accomplish this in functional way, but when I tried to implement same logic in opp style several problems showd up. Mostly Can't pickle coroutine/generator errors. I tracked some of theese, but not all

import asyncio
from concurrent.futures import ProcessPoolExecutor
from itertools import islice
from random import randint

class async_runner(object):
    def __init__(self):
        self.futures = [] # container to store current futures
        self.futures_total = []
        self.loop = asyncio.get_event_loop() # main event_loop
        self.executor = ProcessPoolExecutor()
        self.limit = 1

    def run(self, func, *args):
        temp_loop = asyncio.new_event_loop()
        try:
            coro = func(*args)
            asyncio.set_event_loop(temp_loop)
            ret = temp_loop.run_until_complete(coro)
            return ret
        finally:
            temp_loop.close()
    def limit_futures(self, futures, limit):
        self.futures_total = iter(futures)
        self.futures = [future for future in islice(self.futures_total,0,limit)]
        async def first_to_finish():
            while True:
                await asyncio.sleep(0)
                for f in self.futures:
                    if f.done(): # here raised TypeError: can't pickle coroutine objects
                        print(f.done())
                        self.futures.remove(f)
                        try:
                            #newf = next(self.futures_total)
                            #self.futures.append(newf)
                            print(f.done())
                        except StopIteration as e:
                            pass
                        return f.result()
        while len(self.futures) > 0:
            yield first_to_finish()
    async def run_limited(self, func, args, limit):
        self.limit = int(limit)
        self.futures_total = (self.loop.run_in_executor(self.executor, self.run, func, x) for x in range(110000,119990))
        for ret in self.limit_futures(self.futures_total, 4): # limitation - 4 per all processes
            await ret
    def set_execution(self, func, args, limit):
        ret = self.loop.run_until_complete(self.run_limited(func, args, limit))
        return ret
async def asy(x):
    print('enter: ', x)
    await asyncio.sleep(randint(1,3))
    print('finishing ', x)
    return x

runner = async_runner()
ret = runner.set_execution(asy,urls,2)
print(ret)

But this works fine:

import asyncio
from concurrent.futures import ProcessPoolExecutor
from itertools import islice
import time

async def asy(x):
    print('enter: ', x)
    await asyncio.sleep(1)
    print('finishing ', x)
    return x

def run(corofn, *args):
    loop = asyncio.new_event_loop()
    try:
        coro = corofn(*args)
        asyncio.set_event_loop(loop)
        ret = loop.run_until_complete(coro)
        #print(ret)
        return ret
    finally:
        loop.close()
def limit_futures(futures, limit):
    futures_sl = [
        c for c in islice(futures, 0, limit)
    ]
    print(len(futures_sl))
    async def first_to_finish(futures):
        while True:
            await asyncio.sleep(0)
            for f in futures_sl:
                if f.done():
                    futures_sl.remove(f)
                    try:
                        newf = next(futures)
                        futures_sl.append(newf)
                    except StopIteration as e:
                        pass
                    return f.result()
    while len(futures_sl) > 0:
        yield first_to_finish(futures)
async def main():
    loop = asyncio.get_event_loop()
    executor = ProcessPoolExecutor()
    futures = (loop.run_in_executor(executor, run, asy, x) for x in range(110000,119990))
    '''
    CASE balls to the wall!
    await asyncio.gather(*futures)
    '''
    for ret in limit_futures(futures, 4): # limitation - 4 per all processes
        await ret

if __name__ == '__main__':
    start = time.time()
    '''
    # CASE single
    ret = [asy(x) for x in range(510000,510040)]
    exit()
    '''
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    print("Elapsed time: {:.3f} sec".format(time.time() - start))

I've cant understand why multiprocessing module trying to pickle anything only when objects are in use, but not in any scenario

like image 588
Alex Smith Avatar asked Dec 05 '17 20:12

Alex Smith


1 Answers

The reason why multiprocessing needs to pickle the async_runner instance is because self.runner is a bound method, meaning that it "contains" the async_runner instance.

Since you're not actually using self in the run method, you can just make it a staticmethod to avoid this problem.

like image 104
greschd Avatar answered Nov 01 '22 05:11

greschd