I have a large (~160 million rows) dataframe that I've stored to disk with something like this:
def fillStore(store, tablename):
files = glob.glob('201312*.csv')
names = ["ts", "c_id", "f_id","resp_id","resp_len", "s_id"]
for f in files:
df = pd.read_csv(f, parse_dates=True, index_col=0, names=names)
store.append(tablename, df, format='table', data_columns=['c_id','f_id'])
The table has a time index and I will query using c_id
and f_id
in addition to times (via the index).
I have another dataframe containing ~18000 "incidents." Each incident consists of some (as few as hundreds, as many as hundreds of thousands) individual records. I need to collect some simple statistics for each incident and store them in order to collect some aggregate statistics. Currently I do this like so:
def makeQueryString(c, f, start, stop):
return "c_id == {} & f_id == {} & index >= Timestamp('{}') & index < Timestamp('{}')".format(c, f , str(pd.to_datetime(start)),str(pd.to_datetime(stop)))
def getIncidents(inc_times, store, tablename):
incidents = pd.DataFrame(columns = ['c_id','f_id','resp_id','resp_len','s_id','incident_id'])
for ind, row in inc_times.iterrows():
incidents = incidents.append(store.select(tablename,
makeQueryString(row.c_id,
row.f_id,
row.start,
row.stop))).fillna(ind)
return incidents
This all works fine except for the fact that each store.select()
statement takes roughly 5 seconds which means that processing the full month's worth of data requires somewhere between 24-30 hours of processing. Meanwhile, the actual statistics I need are relatively simple:
def getIncidentStats(df):
incLen = (df.index[-1]-df.index[0]).total_seconds()
if incLen == 0:
incLen = .1
rqsts = len(df)
rqstRate_s = rqsts/incLen
return pd.Series({'c_id':df.c_id[0],
'f_id':df.fqdn_id[0],
'Length_sec':incLen,
'num_rqsts':rqsts,
'rqst_rate':rqstRate_s,
'avg_resp_size':df.response_len.mean(),
'std_resp_size':df.response_len.std()})
incs = getIncidents(i_times, store, tablename)
inc_groups = incs.groupby('incident_id')
inc_stats = inc_groups.apply(getIncidentStats)
My question is: how can I improve the performance or efficiency of any part of this work flow? (Please note that I actually batch most of the jobs to get and store incidents one day at a time simply because I want to limit the risk of losing already processed data in the even of a crash. I left this code out here for simplicity and because I actually need to process the whole month's data.)
Is there a way to process the data as I receive it from the store and is there any benefit to this? Would I benefit from using store.select_as_index? If I receive an index I'd still need to access the data to get the statistics correct?
Other notes/questions: I have compared the performance of storing my HDFStore on both a SSD and normal hard drive and didn't notice any improvement for the SSD. Is this expected?
I also toyed with the idea of creating a large conjunction of query strings and asking for them all at once. This causes memory errors when the total query string is too large (~5-10 queries).
Edit 1 If it matters, I am using tables version 3.1.0 and pandas version 0.13.1
Edit 2 Here is some more information:
ptdump -av store.h5
/ (RootGroup) ''
/._v_attrs (AttributeSet), 4 attributes:
[CLASS := 'GROUP',
PYTABLES_FORMAT_VERSION := '2.0',
TITLE := '',
VERSION := '1.0']
/all_recs (Group) ''
/all_recs._v_attrs (AttributeSet), 14 attributes:
[CLASS := 'GROUP',
TITLE := '',
VERSION := '1.0',
data_columns := ['c_id', 'f_id'],
encoding := None,
index_cols := [(0, 'index')],
info := {1: {'type': 'Index', 'names': [None]}, 'index': {'index_name': 'ts'}},
levels := 1,
nan_rep := 'nan',
non_index_axes := [(1, ['c_id', 'f_id', 'resp_id', 'resp_len', 'dns_server_id'])],
pandas_type := 'frame_table',
pandas_version := '0.10.1',
table_type := 'appendable_frame',
values_cols := ['values_block_0', 'c_id', 'f_id']]
/all_recs/table (Table(161738653,)) ''
description := {
"index": Int64Col(shape=(), dflt=0, pos=0),
"values_block_0": Int64Col(shape=(3,), dflt=0, pos=1),
"c_id": Int64Col(shape=(), dflt=0, pos=2),
"f_id": Int64Col(shape=(), dflt=0, pos=3)}
byteorder := 'little'
chunkshape := (5461,)
autoindex := True
colindexes := {
"index": Index(6, medium, shuffle, zlib(1)).is_csi=False,
"f_id": Index(6, medium, shuffle, zlib(1)).is_csi=False,
"c_id": Index(6, medium, shuffle, zlib(1)).is_csi=False}
/all_recs/table._v_attrs (AttributeSet), 19 attributes:
[CLASS := 'TABLE',
FIELD_0_FILL := 0,
FIELD_0_NAME := 'index',
FIELD_1_FILL := 0,
FIELD_1_NAME := 'values_block_0',
FIELD_2_FILL := 0,
FIELD_2_NAME := 'c_id',
FIELD_3_FILL := 0,
FIELD_3_NAME := 'f_id',
NROWS := 161738653,
TITLE := '',
VERSION := '2.6',
client_id_dtype := 'int64',
client_id_kind := ['c_id'],
fqdn_id_dtype := 'int64',
fqdn_id_kind := ['f_id'],
index_kind := 'datetime64',
values_block_0_dtype := 'int64',
values_block_0_kind := ['s_id', 'resp_len', 'resp_id']]
Here are samples of both the main table and inc_times:
In [12]: df.head()
Out[12]:
c_id f_id resp_id resp_len \
ts
2013-12-04 08:00:00 637092486 5372764353 30 56767543
2013-12-04 08:00:01 637092486 5399580619 23 61605423
2013-12-04 08:00:04 5456242 5385485460 21 46742687
2013-12-04 08:00:04 5456242 5385485460 21 49909681
2013-12-04 08:00:04 624791800 5373236646 14 70461449
s_id
ts
2013-12-04 08:00:00 1829
2013-12-04 08:00:01 1724
2013-12-04 08:00:04 1679
2013-12-04 08:00:04 1874
2013-12-04 08:00:04 1727
[5 rows x 5 columns]
In [13]: inc_times.head()
Out[13]:
c_id f_id start stop
0 7254 196211 1385880945000000000 1385880960000000000
1 9286 196211 1387259840000000000 1387259850000000000
2 16032 196211 1387743730000000000 1387743735000000000
3 19793 196211 1386208175000000000 1386208200000000000
4 19793 196211 1386211800000000000 1386211810000000000
[5 rows x 4 columns]
Regarding c_id and f_id, the set of IDs I want to select from the full store is relatively few compared to the total number of IDs in the store. In other words, there are some popular IDs in inc_times that I will repeatedly query while completely ignoring some of the IDs that exist in the full table. I'd estimate that the Ids I care about are roughly 10% of the total IDs, but that these are the most popular IDs so their records dominate the full set.
I have 16GB RAM. The full store is 7.4G and the full dataset (as a csv file) is only 8.7 GB. Initially I believed I would be able to load the whole thing in memory and at least do some limited operations on it, but I get memory errors on loading the whole thing. Hence, batching it into daily files (the full file consists of data for one month).
Here's some recommendations and a similar question is here
Use compression: see here. You should try this (this could make it faster / slower depending on exactly what you are querying), YMMV.
ptrepack --chunkshape=auto --propindexes --complevel=9 --complib=blosc in.h5 out.h5
Use a hierarchical query in chunks. What I mean is this. Since you have a relatively small number of c_id
and f_id
that you care about, structure a single query something like this. This is kind of like using isin
.
f_ids = list_of_f_ids that I care about
c_ids = list_of_c_ids that I care about
def create_batches(l, maxn=32):
""" create a list of batches, maxed at maxn """
batches = []
while(True):
if len(l) <= maxn:
if len(l) > 0:
batches.append(l)
break
batches.append(l[0:maxn])
l = l[maxn:]
return batches
results = []
for f_id_batch in create_batches(f_id_list):
for c_id_batch in create_batches(c_id_list):
q = "f_id={f_id} & c_id={c_id}".format(
f_id=f_id_batch,
c_id=c_id_batch)
# you can include the max/min times in here as well (they would be max/min
# time for ALL the included batches though, maybe easy for you to compute
result = store.select('df',where=q)
# sub process this result
def f(x):
# you will need to filter out the min/max timestamps here (which I gather
# are somewhat dependent on f_id/c_id group
#### process the data and return something
# you could do something like: ``return x.describe()`` for simple stats
results.append(result.groupby(['f_id','c_id').apply(f))
results = pd.concat(results)
The key here is to process so that the isin
DOES not have more that 32 members
for any variable that you are querying on. This is an internal numpy/pytables limitation.
If you exceed this, the query will work, but it will drop that variable and do a reindex
on ALL the data (which is NOT what you want here).
This way you will have a nice subset of data in memory over just a few loops. These queries I think would take about the same time as most of your queries or so, but you will have way fewer.
The query time is roughly constant for a given subset (unless the data is ordered such that it it is completely indexed).
So the query scans 'blocks' of data (which is what the indexes point to). If you have lots of hits across many blocks then the query is slower.
Here's an example
In [5]: N = 100000000
In [6]: df = DataFrame(np.random.randn(N,3),columns=['A','B','C'])
In [7]: df['c_id'] = np.random.randint(0,10,size=N)
In [8]: df['f_id'] = np.random.randint(0,10,size=N)
In [9]: df.index = date_range('20130101',periods=N,freq='s')
In [10]: df.to_hdf('test2.h5','df',mode='w',data_columns=['c_id','f_id'])
In [11]: df.head()
Out[11]:
A B C c_id f_id
2013-01-01 00:00:00 0.037287 1.153534 0.639669 8 7
2013-01-01 00:00:01 1.741046 0.459821 0.194282 8 3
2013-01-01 00:00:02 -2.273919 -0.141789 0.770567 1 1
2013-01-01 00:00:03 0.320879 -0.108426 -1.310302 8 6
2013-01-01 00:00:04 -1.445810 -0.777090 -0.148362 5 5
2013-01-01 00:00:05 1.608211 0.069196 0.025021 3 6
2013-01-01 00:00:06 -0.561690 0.613579 1.071438 8 2
2013-01-01 00:00:07 1.795043 -0.661966 1.210714 0 0
2013-01-01 00:00:08 0.176347 -0.461176 1.624514 3 6
2013-01-01 00:00:09 -1.084537 1.941610 -1.423559 9 1
2013-01-01 00:00:10 -0.101036 0.925010 -0.809951 0 9
2013-01-01 00:00:11 -1.185520 0.968519 2.871983 7 5
2013-01-01 00:00:12 -1.089267 -0.333969 -0.665014 3 6
2013-01-01 00:00:13 0.544427 0.130439 0.423749 5 7
2013-01-01 00:00:14 0.112216 0.404801 -0.061730 5 4
2013-01-01 00:00:15 -1.349838 -0.639435 0.993495 0 9
In [2]: %timeit pd.read_hdf('test2.h5','df',where="f_id=[1] & c_id=[2]")
1 loops, best of 3: 13.9 s per loop
In [3]: %timeit pd.read_hdf('test2.h5','df',where="f_id=[1,2] & c_id=[1,2]")
1 loops, best of 3: 21.2 s per loop
In [4]: %timeit pd.read_hdf('test.2h5','df',where="f_id=[1,2,3] & c_id=[1,2,3]")
1 loops, best of 3: 42.8 s per loop
This particular example is 5GB uncompressed and 2.9GB compressed. These results are on the compressed data. In THIS case it is actually quite a bit faster to use the uncompressed (e.g. the first loop taked 3.5s). This is 100MM rows.
So using the last example (4) you are getting 9x the data of the first in a little over 3x the query time.
However your speedup should be MUCH more, because you won't be selecting on individual timestamps, rather doing that later.
This whole approach takes into account that you have enough main memory to hold your results in the batch sizes (e.g. you are selecting a relatively small part of the set in the batch queries).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With