Suppose I have a pandas dataframe and a function I'd like to apply to each row. I can call df.apply(apply_fn, axis=1)
, which should take time linear in the size of df
. Or I can split df
and use pool.map
to call my function on each piece, and then concatenate the results.
I was expecting the speedup factor from using pool.map
to be roughly equal to the number of processes in the pool (new_execution_time = original_execution_time/N if using N processors -- and that's assuming zero overhead).
Instead, in this toy example, time falls to around 2% (0.005272 / 0.230757) when using 4 processors. I was expecting 25% at best. What is going on and what am I not understanding?
import numpy as np
from multiprocessing import Pool
import pandas as pd
import pdb
import time
n = 1000
variables = {"hello":np.arange(n), "there":np.random.randn(n)}
df = pd.DataFrame(variables)
def apply_fn(series):
return pd.Series({"col_5":5, "col_88":88,
"sum_hello_there":series["hello"] + series["there"]})
def call_apply_fn(df):
return df.apply(apply_fn, axis=1)
n_processes = 4 # My machine has 4 CPUs
pool = Pool(processes=n_processes)
t0 = time.process_time()
new_df = df.apply(apply_fn, axis=1)
t1 = time.process_time()
df_split = np.array_split(df, n_processes)
pool_results = pool.map(call_apply_fn, df_split)
new_df2 = pd.concat(pool_results)
t2 = time.process_time()
new_df3 = df.apply(apply_fn, axis=1) # Try df.apply a second time
t3 = time.process_time()
print("identical results: %s" % np.all(np.isclose(new_df, new_df2))) # True
print("t1 - t0 = %f" % (t1 - t0)) # I got 0.230757
print("t2 - t1 = %f" % (t2 - t1)) # I got 0.005272
print("t3 - t2 = %f" % (t3 - t2)) # I got 0.229413
I saved the code above and ran it using python3 my_filename.py
.
PS I realize that in this toy example new_df
can be created in a much more straightforward way, without using apply. I'm interested in applying similar code with a more complex apply_fn
that doesn't just add columns.
Edit (My previous answer was actually wrong.)
time.process_time()
(doc) measures time only in the current process (and doesn't include sleeping time). So the time spent in child processes is not taken into account.
I run your code with time.time()
, which measures real-world time (showing no speedup at all) and with a more reliable timeit.timeit
(about 50% speedup). I have 4 cores.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With