I have a medium sized file (~300MB) containing a list of individuals (~300k) and actions they performed. I'm trying to apply an operation for each individuals using groupBy
and the paralellized version of apply
described here. It looks something like this
import pandas
import multiprocessing
from joblib import Parallel, delayed
df = pandas.read_csv(src)
patients_table_raw = apply_parallel(df.groupby('ID'), f)
def applyParallel(dfGrouped, func):
retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
return pd.concat(retLst)
But unfortunately this consumes A HELL LOT OF SPACE. I think it is related with the fact that the simple command:
list_groups = list(df.groupby('ID'))
Consumes several GB of memory! How to procceed? My initial thoughts were to iterate the groupBy in small 'stacks', not consuming too much memory (but I didn't found a way to do so without casting it to a list).
I have a simple CSV dataset in the following fashion:
|-------------------------|
| ID | Timestamp | Action |
|-------------------------|
|1 | 0 | A |
|1 | 10 | B |
|1 | 20 | C |
|2 | 0 | B |
|2 | 15 | C |
...
What I'm basically trying to do is create a different table that contains a description of sequence of actions/timestamps of the individuals and their IDs. This will help me retrieve the individuals
|------------------|
| ID | Description |
|------------------|
|1 | 0A10B20C |
|2 | 0B15C |
...
In order to do so, and to follow a Pythonic way, my idea was basically to load the first table in a pandas DataFrame, groupBy the ID, and apply a function in the grouping that returns a row of the table I want for each group (each ID). However, I have LOTS of individuals in my dataset (around 1 million), and the groupBy operation was extremely expensive (without explicit garbage collection, as I mentioned in my own answer). Also, parallelizing the groupBy implied in significant memory use, because apparently some things get duplicated.
Therefore, the more detailed question is: how to use groupBy (and therefore make the data processing faster than if you would implement a loop of your own) and don't get this huge memory overhead?
Some comments and then the solution I've found:
I've tried dask
and it didn't made much difference. I guess it is because the file is not big enough to use the secondary memory.
The memory performance improves significantly if you perform the garbage collection inside the function you apply to the groups. I've managed to do so with a simple gc.collect()
that happens every $10000$ interactions. Something like:
x['ID'].head(1).values[0] % 10000 == 0:
gc.collect()
The garbage collection actually made my parallel version run. But the return pd.concat(retLst)
was another huge bottleneck, and consumed tons of memory!
My final solution was to paralellize the solution in an outer fashion:
I created a function that will perform the groupBy and the apply for individuals with ID's inside a range [X,Y]
I simply create a pool and run those in parallel. Each process saves a file with a different name, depending on its range
f = functools.partial(make_patient_tables2, src="in", dest="out")
range_of = [(0, 10000), (10000, 20000), (20000, 30000)]
with Pool(cpu_count()) as p:
ret_list = p.map(f, range_of)
Last but not least, I concatenate all the generated files.
Notice that this is still a bit memory intensive, as we have to replicate the reading of the table (which is done inside make_patient_tables2, but would happen anyway, as the multiprocessing doesn't share resources. A better solution, therefore, would envolve sharing resources, but the garbage collector + not using the concat + replication the original data only 2-3 times was enough for me!
Certainly not pretty. Hope it can be of help for someone else.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With