I have a large dataframe (several million rows).
I want to be able to do a groupby operation on it, but just grouping by arbitrary consecutive (preferably equal-sized) subsets of rows, rather than using any particular property of the individual rows to decide which group they go to.
The use case: I want to apply a function to each row via a parallel map in IPython. It doesn't matter which rows go to which back-end engine, as the function calculates a result based on one row at a time. (Conceptually at least; in reality it's vectorized.)
I've come up with something like this:
# Generate a number from 0-9 for each row, indicating which tenth of the DF it belongs to max_idx = dataframe.index.max() tenths = ((10 * dataframe.index) / (1 + max_idx)).astype(np.uint32) # Use this value to perform a groupby, yielding 10 consecutive chunks groups = [g[1] for g in dataframe.groupby(tenths)] # Process chunks in parallel results = dview.map_sync(my_function, groups)
But this seems very long-winded, and doesn't guarantee equal sized chunks. Especially if the index is sparse or non-integer or whatever.
Any suggestions for a better way?
Thanks!
Vectorization is always the first and best choice. You can convert the data frame to NumPy array or into dictionary format to speed up the iteration workflow. Iterating through the key-value pair of dictionaries comes out to be the fastest way with around 280x times speed up for 20 million records.
By using apply and specifying one as the axis, we can run a function on every row of a dataframe. This solution also uses looping to get the job done, but apply has been optimized better than iterrows , which results in faster runtimes.
In order to iterate over rows, we apply a function itertuples() this function return a tuple for each row in the DataFrame. The first element of the tuple will be the row's corresponding index value, while the remaining values are the row values.
To summarize, if your apps save/load data from disk frequently, then it's a wise decision to leave these operations to PyArrow. Heck, it's 7 times faster for the identical file format. Imagine we introduced Parquet file format to the mix.
Use numpy's array_split():
import numpy as np import pandas as pd data = pd.DataFrame(np.random.rand(10, 3)) for chunk in np.array_split(data, 5): assert len(chunk) == len(data) / 5, "This assert may fail for the last chunk if data lenght isn't divisible by 5"
I'm not sure if this is exactly what you want, but I found these grouper functions on another SO thread fairly useful for doing a multiprocessor pool.
Here's a short example from that thread, which might do something like what you want:
import numpy as np import pandas as pds df = pds.DataFrame(np.random.rand(14,4), columns=['a', 'b', 'c', 'd']) def chunker(seq, size): return (seq[pos:pos + size] for pos in xrange(0, len(seq), size)) for i in chunker(df,5): print i
Which gives you something like this:
a b c d 0 0.860574 0.059326 0.339192 0.786399 1 0.029196 0.395613 0.524240 0.380265 2 0.235759 0.164282 0.350042 0.877004 3 0.545394 0.881960 0.994079 0.721279 4 0.584504 0.648308 0.655147 0.511390 a b c d 5 0.276160 0.982803 0.451825 0.845363 6 0.728453 0.246870 0.515770 0.343479 7 0.971947 0.278430 0.006910 0.888512 8 0.044888 0.875791 0.842361 0.890675 9 0.200563 0.246080 0.333202 0.574488 a b c d 10 0.971125 0.106790 0.274001 0.960579 11 0.722224 0.575325 0.465267 0.258976 12 0.574039 0.258625 0.469209 0.886768 13 0.915423 0.713076 0.073338 0.622967
I hope that helps.
EDIT
In this case, I used this function with pool of processors in (approximately) this manner:
from multiprocessing import Pool nprocs = 4 pool = Pool(nprocs) for chunk in chunker(df, nprocs): data = pool.map(myfunction, chunk) data.domorestuff()
I assume this should be very similar to using the IPython distributed machinery, but I haven't tried it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With