I'm writing multi-process code, which runs perfectly in Python 3.7. Yet I want one of the parallel process to execute an IO process take stakes for ever using AsyncIO i order to get better performance, but have not been able to get it to run.
Ubuntu 18.04, Python 3.7, AsyncIO, pipenv (all pip libraries installed)
The method in particular runs as expected using multithreading, which is what I want to replace with AsyncIO.
I have googled and tried looping in the main() function and now only in the intended cor-routine, have looked at examples and read about this new Async way of getting things down and no results so far.
The following is the app.py code which is esecuted: python app.py
import sys
import traceback
import logging
import asyncio
from config import DEBUG
from config import log_config
from <some-module> import <some-class>
if DEBUG:
logging.config.dictConfig(log_config())
else:
logging.basicConfig(
level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
logger = logging.getLogger(__name__)
def main():
try:
<some> = <some-class>([
'some-data1.csv',
'some-data2.csv'
])
<some>.run()
except:
traceback.print_exc()
pdb.post_mortem()
sys.exit(0)
if __name__ == '__main__':
asyncio.run(main())
Here is the code where I have the given class defined
_sql_client = SQLServer()
_blob_client = BlockBlobStore()
_keys = KeyVault()
_data_source = _keys.fetch('some-data')
# Multiprocessing
_manager = mp.Manager()
_ns = _manager.Namespace()
def __init__(self, list_of_collateral_files: list) -> None:
@timeit
def _get_filter_collateral(self, ns: mp.managers.NamespaceProxy) -> None:
@timeit
def _get_hours(self, ns: mp.managers.NamespaceProxy) -> None:
@timeit
def _load_original_bids(self, ns: mp.managers.NamespaceProxy) -> None:
@timeit
def _merge_bids_with_hours(self, ns: mp.managers.NamespaceProxy) -> None:
@timeit
def _get_collaterial_per_month(self, ns: mp.managers.NamespaceProxy) -> None:
@timeit
def _calc_bid_per_path(self) -> None:
@timeit
def run(self) -> None:
The method containing the async code is here:
def _get_filter_collateral(self, ns: mp.managers.NamespaceProxy) -> None:
all_files = self._blob_client.download_blobs(self._list_of_blob_files)
_all_dfs = pd.DataFrame()
async def read_task(file_: str) -> None:
nonlocal _all_dfs
df = pd.read_csv(StringIO(file_.content))
_all_dfs = _all_dfs.append(df, sort=False)
tasks = []
loop = asyncio.new_event_loop()
for file_ in all_files:
tasks.append(asyncio.create_task(read_task(file_)))
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
_all_dfs['TOU'] = _all_dfs['TOU'].map(lambda x: 'OFFPEAK' if x == 'OFF' else 'ONPEAK')
ns.dfs = _all_dfs
And the method that calls the particular sequence and and this async method is:
def run(self) -> None:
extract = []
extract.append(mp.Process(target=self._get_filter_collateral, args=(self._ns, )))
extract.append(mp.Process(target=self._get_hours, args=(self._ns, )))
extract.append(mp.Process(target=self._load_original_bids, args=(self._ns, )))
# Start the parallel processes
for process in extract:
process.start()
# Await for database process to end
extract[1].join()
extract[2].join()
# Merge both database results
self._merge_bids_with_hours(self._ns)
extract[0].join()
self._get_collaterial_per_month(self._ns)
self._calc_bid_per_path()
self._save_reports()
self._upload_data()
These are the errors I get:
Process Process-2:
Traceback (most recent call last):
File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "<some-path>/src/azure/application/utils/lib.py", line 10, in timed
result = method(*args, **kwargs)
File "<some-path>/src/azure/application/caiso/main.py", line 104, in _get_filter_collateral
tasks.append(asyncio.create_task(read_task(file_)))
File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/asyncio/tasks.py", line 350, in create_task
loop = events.get_running_loop()
RuntimeError: no running event loop
<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py:313: RuntimeWarning: coroutine '<some-class>._get_filter_collateral.<locals>.read_task' was never awaited
traceback.print_exc()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
DEBUG Calculating monthly collateral...
Traceback (most recent call last):
File "app.py", line 25, in main
caiso.run()
File "<some-path>/src/azure/application/utils/lib.py", line 10, in timed
result = method(*args, **kwargs)
File "<some-path>/src/azure/application/caiso/main.py", line 425, in run
self._get_collaterial_per_month(self._ns)
File "<some-path>/src/azure/application/utils/lib.py", line 10, in timed
result = method(*args, **kwargs)
File "<some-path>/src/azure/application/caiso/main.py", line 196, in _get_collaterial_per_month
credit_margin = ns.dfs
File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py", line 1122, in __getattr__
return callmethod('__getattribute__', (key,))
File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py", line 834, in _callmethod
raise convert_to_error(kind, result)
AttributeError: 'Namespace' object has no attribute 'dfs'
> <some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py(834)_callmethod()
-> raise convert_to_error(kind, result)
(Pdb)
As it seems from the Traceback log it is look like you are trying to add tasks to not running event loop.
/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py:313: RuntimeWarning: coroutine '._get_filter_collateral..read_task' was never awaited
The loop was just created and it's not running yet, therefor the asyncio
unable to attach tasks to it.
The following example will reproduce the same results, adding tasks and then trying to await
for all of them to finish:
import asyncio
async def func(num):
print('My name is func {0}...'.format(num))
loop = asyncio.get_event_loop()
tasks = list()
for i in range(5):
tasks.append(asyncio.create_task(func(i)))
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
Results with:
Traceback (most recent call last):
File "C:/tmp/stack_overflow.py", line 42, in <module>
tasks.append(asyncio.create_task(func(i)))
File "C:\Users\Amiram\AppData\Local\Programs\Python\Python37-32\lib\asyncio\tasks.py", line 324, in create_task
loop = events.get_running_loop()
RuntimeError: no running event loop
sys:1: RuntimeWarning: coroutine 'func' was never awaited
Nonetheless the solution is pretty simple, you just need to add the tasks to the created loop - instead of asking the asyncio
to go it.
The only change is needed in the following line:
tasks.append(asyncio.create_task(func(i)))
Change the creation of the task from the asyncio
to the newly created loop, you are able to do it because this is your loop unlike the asynio which is searching for a running one.
So the new line should look like this:
tasks.append(loop.create_task(func(i)))
Another solution could be running an async function and create the tasks there for example (Because that loop is already running now the asyncio
enable to attach tasks to it):
import asyncio
async def func(num):
print('Starting func {0}...'.format(num))
await asyncio.sleep(0.1)
print('Ending func {0}...'.format(num))
loop = asyncio.get_event_loop()
async def create_tasks_func():
tasks = list()
for i in range(5):
tasks.append(asyncio.create_task(func(i)))
await asyncio.wait(tasks)
loop.run_until_complete(create_tasks_func())
loop.close()
This simple change will results with:
Starting func 0...
Starting func 1...
Starting func 2...
Starting func 3...
Starting func 4...
Ending func 0...
Ending func 2...
Ending func 4...
Ending func 1...
Ending func 3...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With