The streamz
and hvplot
packages work together to provide support for plotting streaming data using pandas dataframes.
For example, the streamz
package has a convenience utility for creating a random streaming dataframe:
import hvplot.streamz
from streamz.dataframe import Random
sdf = Random(interval='200ms', freq='50ms')
sdf
# Stop the streaming with: sdf.stop()
This can be plotted trivially in a streaming chart using hvplot
:
sdf.hvplot()
Is there a simple way of streaming data from a pre-existing pandas
dataframe?
For example, I'd like to be able to say something like:
import pandas as pd
df=pd.DataFrame({'a':range(0,100),'b':range(5,105)})
sdf = StreamingDataFrame(df, interval='200ms', freq='50ms')
Then, rather than using random example data, I could trivially use example data from a pre-existing pandas
dataframe.
This is as far as I got...
from streamz import Stream
from streamz.dataframe import DataFrame
from time import sleep
from datetime import datetime
#Set up a source stream
source = Stream()
#Create a sample pandas dataframe
samples = pd.DataFrame({'x':[0],'y':[0]})
#The streaming dataframe takes the source stream and sample pandas dataframe
#The sample defines the dataframe schema, maybe?
sdf = DataFrame(source, example=samples)
def stest(r):
print(datetime.now())
print(r)
#I don't recall what this does
#I think what I was looking to do was display the last 3 items...?
# ...which this doesn't appear to do!
df = sdf.window(3).full()
#This seems to set a callback on stest when a stream element appears
df.stream.sink(stest)
for i in range(5):
#pull the next item in the streaming dataframe into the stream
#We could iloc on an existing dataframe?
source.emit(pd.DataFrame({'x': [i,i,i], 'y' :[i,i,i]}))
#Pause for a short while...
sleep(2)
--------
2020-01-27 19:13:06.816315
x y
0 0 0
1 0 0
2 0 0
2020-01-27 19:13:08.824016
x y
0 1 1
1 1 1
2 1 1
2020-01-27 19:13:10.829178
x y
0 2 2
1 2 2
2 2 2
2020-01-27 19:13:12.835948
x y
0 3 3
1 3 3
2 3 3
2020-01-27 19:13:14.843432
x y
0 4 4
1 4 4
2 4 4
....
Ah.. found an example that does seem to work better.
Set up a dataframe we want to stream:
import pandas as pd
from time import sleep
from datetime import datetime
from streamz import Stream
from streamz.dataframe import DataFrame
#Set up a dummy dataframe
ddf=pd.DataFrame({'a':range(1,5),'b':range(11,15)})
print(ddf)
---
a b
0 1 11
1 2 12
2 3 13
3 4 14
And stream it...
#Create a stream source
source = Stream()
#Create a dataframe model for the stream
samples = pd.DataFrame({'a':[0],'b':[0]})
#Creating the streaming dataframe
sdf = DataFrame(source, example=samples)
#That window thing again...
df = sdf.window(4).full()
#FUnction to run when an item appears on the stream
def stest(r):
print(datetime.now())
print(r)
#Add the callback function to streamer
df.stream.sink(stest)
#This does stream from the dataframe
for i in range(len(ddf)):
source.emit(ddf.iloc[i])
sleep(2)
df
---
2020-01-27 19:28:05.503123
a 1
b 11
Name: 0, dtype: int64
2020-01-27 19:28:07.505536
a 1
b 11
a 2
b 12
dtype: int64
2020-01-27 19:28:09.508925
a 2
b 12
a 3
b 13
dtype: int64
2020-01-27 19:28:11.514117
a 3
b 13
a 4
b 14
dtype: int64
I also found another way of getting data to stream into a holoview plot:
from tornado import gen
from tornado.ioloop import PeriodicCallback
from holoviews.streams import Buffer
import holoviews as hv
hv.extension('bokeh')
import numpy as np
import pandas as pd
df = pd.DataFrame({'x':range(1000), 'y':np.sin(range(1000))})
rowcount = 0
maxrows = 1000
dfbuffer = Buffer(np.zeros((0, 2)), length=20)
@gen.coroutine
def g():
global rowcount
item = df[['x','y']].iloc[rowcount].values.tolist()
dfbuffer.send(np.array([item]))
rowcount += 1
if rowcount>=maxrows:
cbdf.stop()
#How can we get the thing to stop?
cbdf = PeriodicCallback(g, 500)
cbdf.start()
hv.DynamicMap(hv.Curve, streams=[dfbuffer]).opts(padding=0.1, width=600, color = 'green',)
And then stop it with: cbdf.stop()
(only this didn't seem to work for me when I just tried it...)
I don't seem to have got as far as an example for myself of connecting a streamz
component to the chart (unless holoviews is using streamz
underneath`?)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With