Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asyncio in corroutine RuntimeError: no running event loop

I'm writing multi-process code, which runs perfectly in Python 3.7. Yet I want one of the parallel process to execute an IO process take stakes for ever using AsyncIO i order to get better performance, but have not been able to get it to run.

Ubuntu 18.04, Python 3.7, AsyncIO, pipenv (all pip libraries installed)

The method in particular runs as expected using multithreading, which is what I want to replace with AsyncIO.

I have googled and tried looping in the main() function and now only in the intended cor-routine, have looked at examples and read about this new Async way of getting things down and no results so far.

The following is the app.py code which is esecuted: python app.py

import sys
import traceback
import logging
import asyncio

from config import DEBUG
from config import log_config
from <some-module> import <some-class>

if DEBUG:
    logging.config.dictConfig(log_config())
else:
    logging.basicConfig(
        level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
logger = logging.getLogger(__name__)


def main():
    try:
        <some> = <some-class>([
            'some-data1.csv',
            'some-data2.csv'
            ])
        <some>.run()

    except:

        traceback.print_exc()
        pdb.post_mortem()

    sys.exit(0)


if __name__ == '__main__':

    asyncio.run(main())

Here is the code where I have the given class defined

    _sql_client = SQLServer()
    _blob_client = BlockBlobStore()
    _keys = KeyVault()
    _data_source = _keys.fetch('some-data')
    #  Multiprocessing
    _manager = mp.Manager()
    _ns = _manager.Namespace()

    def __init__(self, list_of_collateral_files: list) -> None:

    @timeit
    def _get_filter_collateral(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _get_hours(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _load_original_bids(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _merge_bids_with_hours(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _get_collaterial_per_month(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _calc_bid_per_path(self) -> None:

    @timeit
    def run(self) -> None:

The method containing the async code is here:

    def _get_filter_collateral(self, ns: mp.managers.NamespaceProxy) -> None:

        all_files = self._blob_client.download_blobs(self._list_of_blob_files)

        _all_dfs = pd.DataFrame()
        async def read_task(file_: str) -> None:
            nonlocal _all_dfs
            df = pd.read_csv(StringIO(file_.content))
            _all_dfs = _all_dfs.append(df, sort=False)

        tasks = []
        loop = asyncio.new_event_loop()

        for file_ in all_files:
            tasks.append(asyncio.create_task(read_task(file_)))

        loop.run_until_complete(asyncio.wait(tasks))
        loop.close()

        _all_dfs['TOU'] = _all_dfs['TOU'].map(lambda x: 'OFFPEAK' if x == 'OFF' else 'ONPEAK')
        ns.dfs = _all_dfs

And the method that calls the particular sequence and and this async method is:

    def run(self) -> None:
        extract = []
        extract.append(mp.Process(target=self._get_filter_collateral, args=(self._ns, )))
        extract.append(mp.Process(target=self._get_hours, args=(self._ns, )))
        extract.append(mp.Process(target=self._load_original_bids, args=(self._ns, )))

        #  Start the parallel processes
        for process in extract:
            process.start()

        #  Await for database process to end
        extract[1].join()
        extract[2].join()

        #  Merge both database results
        self._merge_bids_with_hours(self._ns)

        extract[0].join()

        self._get_collaterial_per_month(self._ns)
        self._calc_bid_per_path()
        self._save_reports()
        self._upload_data()

These are the errors I get:

Process Process-2:
Traceback (most recent call last):
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "<some-path>/src/azure/application/utils/lib.py", line 10, in timed
    result = method(*args, **kwargs)
  File "<some-path>/src/azure/application/caiso/main.py", line 104, in _get_filter_collateral
    tasks.append(asyncio.create_task(read_task(file_)))
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/asyncio/tasks.py", line 350, in create_task
    loop = events.get_running_loop()
RuntimeError: no running event loop
<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py:313: RuntimeWarning: coroutine '<some-class>._get_filter_collateral.<locals>.read_task' was never awaited
  traceback.print_exc()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
DEBUG Calculating monthly collateral...
Traceback (most recent call last):
  File "app.py", line 25, in main
    caiso.run()
  File "<some-path>/src/azure/application/utils/lib.py", line 10, in timed
    result = method(*args, **kwargs)
  File "<some-path>/src/azure/application/caiso/main.py", line 425, in run
    self._get_collaterial_per_month(self._ns)
  File "<some-path>/src/azure/application/utils/lib.py", line 10, in timed
    result = method(*args, **kwargs)
  File "<some-path>/src/azure/application/caiso/main.py", line 196, in _get_collaterial_per_month
    credit_margin = ns.dfs
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py", line 1122, in __getattr__
    return callmethod('__getattribute__', (key,))
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py", line 834, in _callmethod
    raise convert_to_error(kind, result)
AttributeError: 'Namespace' object has no attribute 'dfs'
> <some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py(834)_callmethod()
-> raise convert_to_error(kind, result)
(Pdb)

like image 997
Francisco Avatar asked Nov 08 '19 22:11

Francisco


1 Answers

As it seems from the Traceback log it is look like you are trying to add tasks to not running event loop.

/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py:313: RuntimeWarning: coroutine '._get_filter_collateral..read_task' was never awaited

The loop was just created and it's not running yet, therefor the asyncio unable to attach tasks to it.

The following example will reproduce the same results, adding tasks and then trying to await for all of them to finish:

import asyncio
async def func(num):
    print('My name is func {0}...'.format(num))

loop = asyncio.get_event_loop()
tasks = list()
for i in range(5):
    tasks.append(asyncio.create_task(func(i)))
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

Results with:

Traceback (most recent call last):
  File "C:/tmp/stack_overflow.py", line 42, in <module>
    tasks.append(asyncio.create_task(func(i)))
  File "C:\Users\Amiram\AppData\Local\Programs\Python\Python37-32\lib\asyncio\tasks.py", line 324, in create_task
    loop = events.get_running_loop()
RuntimeError: no running event loop
sys:1: RuntimeWarning: coroutine 'func' was never awaited

Nonetheless the solution is pretty simple, you just need to add the tasks to the created loop - instead of asking the asyncio to go it. The only change is needed in the following line:

tasks.append(asyncio.create_task(func(i)))

Change the creation of the task from the asyncio to the newly created loop, you are able to do it because this is your loop unlike the asynio which is searching for a running one.

So the new line should look like this:

tasks.append(loop.create_task(func(i)))

Another solution could be running an async function and create the tasks there for example (Because that loop is already running now the asyncio enable to attach tasks to it):

import asyncio
async def func(num):
    print('Starting func {0}...'.format(num))
    await asyncio.sleep(0.1)
    print('Ending func {0}...'.format(num))

loop = asyncio.get_event_loop()
async def create_tasks_func():
    tasks = list()
    for i in range(5):
        tasks.append(asyncio.create_task(func(i)))
    await asyncio.wait(tasks)
loop.run_until_complete(create_tasks_func())
loop.close()

This simple change will results with:

Starting func 0...
Starting func 1...
Starting func 2...
Starting func 3...
Starting func 4...
Ending func 0...
Ending func 2...
Ending func 4...
Ending func 1...
Ending func 3...
like image 52
Amiram Avatar answered Sep 22 '22 00:09

Amiram