I am using Python multiprocessing, more precisely
from multiprocessing import Pool p = Pool(15) args = [(df, config1), (df, config2), ...] #list of args - df is the same object in each tuple res = p.map_async(func, args) #func is some arbitrary function p.close() p.join()
This approach has a huge memory consumption; eating up pretty much all my RAM (at which point it gets extremely slow, hence making the multiprocessing pretty useless). I assume the problem is that df
is a huge object (a large pandas dataframe) and it gets copied for each process. I have tried using multiprocessing.Value
to share the dataframe without copying
shared_df = multiprocessing.Value(pandas.DataFrame, df) args = [(shared_df, config1), (shared_df, config2), ...]
(as suggested in Python multiprocessing shared memory), but that gives me TypeError: this type has no size
(same as Sharing a complex object between Python processes?, to which I unfortunately don't understand the answer).
I am using multiprocessing for the first time and maybe my understanding is not (yet) good enough. Is multiprocessing.Value
actually even the right thing to use in this case? I have seen other suggestions (e.g. queue) but am by now a bit confused. What options are there to share memory, and which one would be best in this case?
Python 3.8 introduced a new module multiprocessing. shared_memory that provides shared memory for direct access across processes. My test shows that it significantly reduces the memory usage, which also speeds up the program by reducing the costs of copying and moving things around.
It makes it very easy to do multiprocessing in Pandas. This package works like a charm on my MacBook.
The multiprocessing package supports spawning processes. It refers to a function that loads and executes a new child processes. For the child to terminate or to continue executing concurrent computing,then the current process hasto wait using an API, which is similar to threading module.
The first argument to Value
is typecode_or_type. That is defined as:
typecode_or_type determines the type of the returned object: it is either a ctypes type or a one character typecode of the kind used by the array module. *args is passed on to the constructor for the type.
Emphasis mine. So, you simply cannot put a pandas dataframe in a Value
, it has to be a ctypes type.
You could instead use a multiprocessing.Manager
to serve your singleton dataframe instance to all of your processes. There's a few different ways to end up in the same place - probably the easiest is to just plop your dataframe into the manager's Namespace
.
from multiprocessing import Manager mgr = Manager() ns = mgr.Namespace() ns.df = my_dataframe # now just give your processes access to ns, i.e. most simply # p = Process(target=worker, args=(ns, work_unit))
Now your dataframe instance is accessible to any process that gets passed a reference to the Manager. Or just pass a reference to the Namespace
, it's cleaner.
One thing I didn't/won't cover is events and signaling - if your processes need to wait for others to finish executing, you'll need to add that in. Here is a page with some Event
examples which also cover with a bit more detail how to use the manager's Namespace
.
(note that none of this addresses whether multiprocessing
is going to result in tangible performance benefits, this is just giving you the tools to explore that question)
You can use Array
instead of Value
for storing your dataframe.
The solution below converts a pandas
dataframe to an object that stores its data in shared memory:
import numpy as np import pandas as pd import multiprocessing as mp import ctypes # the origingal dataframe is df, store the columns/dtypes pairs df_dtypes_dict = dict(list(zip(df.columns, df.dtypes))) # declare a shared Array with data from df mparr = mp.Array(ctypes.c_double, df.values.reshape(-1)) # create a new df based on the shared array df_shared = pd.DataFrame(np.frombuffer(mparr.get_obj()).reshape(df.shape), columns=df.columns).astype(df_dtypes_dict)
If now you share df_shared
across processes, no additional copies will be made. For you case:
pool = mp.Pool(15) def fun(config): # df_shared is global to the script df_shared.apply(config) # whatever compute you do with df/config config_list = [config1, config2] res = p.map_async(fun, config_list) p.close() p.join()
This is also particularly useful if you use pandarallel, for example:
# this will not explode in memory from pandarallel import pandarallel pandarallel.initialize() df_shared.parallel_apply(your_fun, axis=1)
Note: with this solution you end up with two dataframes (df and df_shared), which consume twice the memory and are long to initialise. It might be possible to read the data directly in shared memory.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With