Which is the most recommended/pythonic way of handling live incoming data with pandas?
Every few seconds I'm receiving a data point in the format below:
{'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH',
'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}
I would like to append it to an existing DataFrame and then run some analysis on it.
The problem is, just appending rows with DataFrame.append can lead to performance issues with all that copying.
A few people suggested preallocating a big DataFrame and updating it as data comes in:
In [1]: index = pd.DatetimeIndex(start='2013-01-01 00:00:00', freq='S', periods=5)
In [2]: columns = ['high', 'low', 'open', 'close']
In [3]: df = pd.DataFrame(index=t, columns=columns)
In [4]: df
Out[4]:
high low open close
2013-01-01 00:00:00 NaN NaN NaN NaN
2013-01-01 00:00:01 NaN NaN NaN NaN
2013-01-01 00:00:02 NaN NaN NaN NaN
2013-01-01 00:00:03 NaN NaN NaN NaN
2013-01-01 00:00:04 NaN NaN NaN NaN
In [5]: data = {'time' :'2013-01-01 00:00:02', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}
In [6]: data_ = pd.Series(data)
In [7]: df.loc[data['time']] = data_
In [8]: df
Out[8]:
high low open close
2013-01-01 00:00:00 NaN NaN NaN NaN
2013-01-01 00:00:01 NaN NaN NaN NaN
2013-01-01 00:00:02 4 3 2 1
2013-01-01 00:00:03 NaN NaN NaN NaN
2013-01-01 00:00:04 NaN NaN NaN NaN
The other alternative is building a list of dicts. Simply appending the incoming data to a list and slicing it into smaller DataFrames to do the work.
In [9]: ls = []
In [10]: for n in range(5):
.....: # Naive stuff ahead =)
.....: time = '2013-01-01 00:00:0' + str(n)
.....: d = {'time' : time, 'stock' : 'BLAH', 'high' : np.random.rand()*10, 'low' : np.random.rand()*10, 'open' : np.random.rand()*10, 'close' : np.random.rand()*10}
.....: ls.append(d)
In [11]: df = pd.DataFrame(ls[1:3]).set_index('time')
In [12]: df
Out[12]:
close high low open stock
time
2013-01-01 00:00:01 3.270078 1.008289 7.486118 2.180683 BLAH
2013-01-01 00:00:02 3.883586 2.215645 0.051799 2.310823 BLAH
or something like that, maybe processing the input a little bit more.
I would use HDF5/pytables as follows:
In fact, the function I define uses a list for each "key" so that you can store multiple DataFrames to the HDF5 Store in the same process.
We define a function which you call with each row d
:
CACHE = {}
STORE = 'store.h5' # Note: another option is to keep the actual file open
def process_row(d, key, max_len=5000, _cache=CACHE):
"""
Append row d to the store 'key'.
When the number of items in the key's cache reaches max_len,
append the list of rows to the HDF5 store and clear the list.
"""
# keep the rows for each key separate.
lst = _cache.setdefault(key, [])
if len(lst) >= max_len:
store_and_clear(lst, key)
lst.append(d)
def store_and_clear(lst, key):
"""
Convert key's cache list to a DataFrame and append that to HDF5.
"""
df = pd.DataFrame(lst)
with pd.HDFStore(STORE) as store:
store.append(key, df)
lst.clear()
Note: we use the with statement to automatically close the store after each write. It may be faster to keep it open, but if so it's recommended that you flush regularly (closing flushes). Also note it may be more readable to have used a collections deque rather than a list, but the performance of a list will be slightly better here.
To use this you call as:
process_row({'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0},
key="df")
Note: "df" is the stored key used in the pytables store.
Once the job has finished ensure you store_and_clear
the remaining cache:
for k, lst in CACHE.items(): # you can instead use .iteritems() in python 2
store_and_clear(lst, k)
Now your complete DataFrame is available via:
with pd.HDFStore(STORE) as store:
df = store["df"] # other keys will be store[key]
len(df)
).Additionally, to get the up to date reads you could define a get method which stores and clears before reading. In this way you would get the most up to date data:
def get_latest(key, _cache=CACHE):
store_and_clear(_cache[key], key)
with pd.HDFStore(STORE) as store:
return store[key]
Now when you access with:
df = get_latest("df")
you'll get the latest "df" available.
Another option is slightly more involved: define a custom table in vanilla pytables, see the tutorial.
Note: You need to know the field-names to create the column descriptor.
You are actually trying to solve two problems: capturing real-time data and analyzing that data. The first problem can be solved with Python logging, which is designed for this purpose. Then the other problem can be solved by reading that same log file.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With