I just read this introduction, but am having trouble implementing either of the examples (commented code being the second example):
import asyncio
import pandas as pd
from openpyxl import load_workbook
async def loop_dfs(dfs):
async def clean_df(df):
df.drop(["column_1"], axis=1, inplace=True)
... a bunch of other inplace=True functions ...
return "Done"
# tasks = [clean_df(df) for (table, dfs) in dfs.items()]
# await asyncio.gather(*tasks)
tasks = [clean_df(df) for (table, df) in dfs.items()]
completed, pending = await asyncio.wait(tasks)
def main():
dfs = {
sn: pd.read_excel("excel.xlsx", sheet_name=sn)
for sn in load_workbook("excel.xlsx").sheetnames
}
# loop = asyncio.get_event_loop()
# loop.run_until_complete(loop_dfs(dfs))
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(loop_dfs(dfs))
finally:
loop.close()
main()
I saw a few other posts about how pandas doesn't support asyncio, and maybe i'm just missing a bigger picture, but that shouldn't matter if i'm doing inplace operations right? I saw recommendations for Dask but without immediate support for reading excel, figured i'd try this first but I keep getting
RuntimeError: Event loop already running
If you set inplace = True , the drop() method will delete rows or columns directly from the original dataframe. Said differently, if you set inplace = True , Pandas will overwrite your data instead of producing a new dataframe as an output. Be careful when you use this parameter, since it will overwrite your data.
Simply speaking, thread-safe means that it is safe when more than one thread access the same resource and I know Asyncio use a single thread fundamentally. However, more than one Asyncio Task could access a resource multiple time at a time like multi-threading .
It should be used as a main entry point for asyncio programs, and should ideally only be called once. New in version 3.7.
Standard library asyncio is definitely slower than most multi-threaded frameworks, because asyncio executes a lot of Python for each event. Generally frameworks are faster the more that they're implemented in C or another compiled language.
I saw a few other posts about how pandas doesn't support asyncio, and maybe i'm just missing a bigger picture, but that shouldn't matter if i'm doing inplace operations right?
In-place operations are those that modify existing data. That is a matter of efficiency, whereas your goal appears to be parallelization, an entirely different matter.
Pandas doesn't support asyncio not only because this wasn't yet implemented, but because Pandas doesn't typically do the kind of operations that asyncio supports well: network and subprocess IO. Pandas functions either use the CPU or wait for disk access, neither of which is a good fit for asyncio. Asyncio allows network communication to be expressed with coroutines that look like ordinary synchronous code. Inside a coroutine each blocking operation (e.g. a network read) is await
ed, which automatically suspends the whole task if the data is not yet available. At each such suspension the system switches to the next task, creating effectively a cooperative multi-tasking system.
When trying to call a library that doesn't support asyncio, such as pandas, things will superficially appear to work, but you won't get any benefit and the code will run serially. For example:
async def loop_dfs(dfs):
async def clean_df(df):
...
tasks = [clean_df(df) for (table, df) in dfs.items()]
completed, pending = await asyncio.wait(tasks)
Since clean_df
doesn't contain a single instance of await
, it is a coroutine in name only - it will never actually suspend its execution to allow other coroutines to run. Thus await asyncio.wait(tasks)
will run the tasks in series, as if you wrote:
for table, df in dfs.items():
clean_df(df)
To get things to run in parallel (provided pandas occasionally releases the GIL during its operations), you should hand off the individual CPU-bound functions to a thread pool:
async def loop_dfs(dfs):
def clean_df(df): # note: ordinary def
...
loop = asyncio.get_event_loop(0
tasks = [loop.run_in_executor(clean_df, df)
for (table, df) in dfs.items()]
completed, pending = await asyncio.wait(tasks)
If you go down that route, you don't need asyncio in the first place, you can simply use concurrent.futures
. For example:
def loop_dfs(dfs): # note: ordinary def
def clean_df(df):
...
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(clean_df, df)
for (table, df) in dfs.items()]
concurrent.futures.wait(futures)
figured i'd try this first but I keep getting
RuntimeError: Event loop already running
That error typically means that you've started the script in an environment that already runs asyncio for you, such as a jupyter notebook. If that is the case, make sure that you run your script with stock python
, or consult your notebook's documentation how to change your code to submit the coroutines to the event loop that already runs.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With