I have a very big CSV file (tens of Gigas) containing web logs with the following columns: user_id
, time_stamp
, category_clicked
. I have to build a scorer to identify what categories users like and dislike. Note that I have more than 10 millions users.
I first cut it in chunks and store them in a HDFStore
named input.h5
then I use groupby
on user_id
following Jeff's way.
Here is my data: about 200 millions rows, 10 millions unique user_ids.
user id | timestamp | category_clicked
20140512081646222000004-927168801|20140722|7
20140512081714121000004-383009763|20140727|4
201405011348508050000041009490586|20140728|1
20140512081646222000004-927168801|20140724|1
20140501135024818000004-1623130763|20140728|3
Here is my pandas.show_version():
INSTALLED VERSIONS
------------------
commit: None
python: 2.7.6.final.0
python-bits: 64
OS: Windows
OS-release: 8
machine: AMD64
processor: AMD64 Family 21 Model 2 Stepping 0, AuthenticAMD
byteorder: little
LC_ALL: None
LANG: fr_FR
pandas: 0.13.1
Cython: 0.20.1
numpy: 1.8.1
scipy: 0.13.3
statsmodels: 0.5.0
IPython: 2.0.0
sphinx: 1.2.2
patsy: 0.2.1
scikits.timeseries: None
dateutil: 2.2
pytz: 2013.9
bottleneck: None
tables: 3.1.1
numexpr: 2.3.1
matplotlib: 1.3.1
openpyxl: None
xlrd: 0.9.3
xlwt: 0.7.5
xlsxwriter: None
sqlalchemy: 0.9.4
lxml: None
bs4: None
html5lib: None
bq: None
apiclient: None
Here is what I want as an output:
for each user_id, a list [0.1,0.45,0.89,1.45,5.12,0.,0.,0.45,0.12,2.36,7.8]
representing the score of the user for each category and a a global score. I can't tell you more about the score but it needs both ALL the timestamps and the category_clicked to be calculated. You can't sum up later or things like this.
Here is my code:
clean_input_reader = read_csv(work_path + '/input/input.csv', chunksize=500000)
with get_store(work_path+'/input/input.h5') as store:
for chunk in clean_input_reader:
store.append('clean_input', chunk,
data_columns=['user_id','timestamp','category_clicked'],
min_itemsize=15)
groups = store.select_column('clean_input','user_id').unique()
for user in groups:
group_user = store.select('clean_input',where=['user_id==%s' %user])
<<<<TREATMENT returns a list user_cat_score>>>>
store.append(user, Series(user_cat_score))
My question is the following: It looks to me that the line:
group_user=store.select('clean_input',where=['user_id==%s' %user])
is too heavy in time complexity since I have really a lot of groups, and I am sure there is a lot of redundant sorting in the routine of store.select
if I apply it 10 millions times.
To give you an estimation, I take 250 seconds to process 1000 keys with this technique, instead of only 1 second in the case of a usual groupby
with full-in-memory CSV file read with read_csv
without chunking.
**********UPDATE***********
After applying Jeff's hashing method, I could process 1000 keys in 1 second (same timing as for the full in-memory method), and absolutely reduced the RAM usage. The only time penalty I had not previously is of course the time I take for chunking, saving the 100 hash groups, and getting the real groups from hash ones in the store. But this operation doesn't take more than a few minutes.
Here's a soln for scaling this problem arbitrarily. This is in effect a high-density version of this question here
Define a function to hash a particular group value to a smaller number of groups. I would design this such that it divides your dataset into in-memory manageable pieces.
def sub_group_hash(x):
# x is a dataframe with the 'user id' field given above
# return the last 2 characters of the input
# if these are number like, then you will be sub-grouping into 100 sub-groups
return x['user id'].str[-2:]
Using the data provided above, this creates a grouped frame on the input data like so:
In [199]: [ (grp, grouped) for grp, grouped in df.groupby(sub_group_hash) ][0][1]
Out[199]:
user id timestamp category
0 20140512081646222000004-927168801 20140722 7
3 20140512081646222000004-927168801 20140724 1
with grp
as the name of the group, and grouped
as resultant frame
# read in the input in a chunked way
clean_input_reader = read_csv('input.csv', chunksize=500000)
with get_store('output.h5') as store:
for chunk in clean_input_reader:
# create a grouper for each chunk using the sub_group_hash
g = chunk.groupby(sub_group_hash)
# append each of the subgroups to a separate group in the resulting hdf file
# this will be a loop around the sub_groups (100 max in this case)
for grp, grouped in g:
store.append('group_%s' % grp, grouped,
data_columns=['user_id','timestamp','category_clicked'],
min_itemsize=15)
Now you have a hdf file with 100 sub-groups (potentially less if not all groups were represented), each of which contains all of the data necessary for performing your operation.
with get_store('output.h5') as store:
# all of the groups are now the keys of the store
for grp in store.keys():
# this is a complete group that will fit in memory
grouped = store.select(grp)
# perform the operation on grouped and write the new output
grouped.groupby(......).apply(your_cool_function)
So this will reduce the problem by a factor of 100 in this case. If that is not sufficient, then simply increase the sub_group_hash to make more groups.
You should strive for a smaller number as the HDF5 works better (e.g. don't make 10M sub_groups that defeats the purpose, 100, 1000, even 10k is ok). But I think 100 should prob work for you, unless you have a very wild group density (e.g. you have massive numbers in a single group, while very few in other groups).
Note that this problem then scales easily; you could store the sub_groups in separate files if you want, and/or work on them separately (in parallel) if necessary.
This should make your soln time approx O(number_of_sub_groups)
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With