I'm trying to simulate reading data from a queue such as kafka. I need to maintain a pointer to the current record when I am streaming data from a file. Currently I'm doing this with a global variable, but this appears to be shared across all user sessions.
How can I save this user state in bokeh?
def modify_doc(doc):
df_all = pd.read_csv('data.csv')
df_all['Date'] = pd.to_datetime(df_all['Date'])
start_data = df_all[0:10].to_dict(orient='list')
source = ColumnDataSource(data=start_data)
...
def callback():
# FIXME: how can we save the current_record in the user's session?
global current_record
try:
current_record
except NameError:
current_record = 10
df = df_all[current_record:current_record+1]
if df.shape[0] > 0:
# we have another record so display it
new_data = df.to_dict(orient='list')
source.stream( new_data )
current_record = current_record + 1
doc.add_root(plot)
doc.add_periodic_callback(callback, 250)
I've seen the documentation for ClientSession, but this appears to work at the whole document level?
I've included a Minimal, Complete and Verifiable example below:
file: bokeh_server.py
Run locally with: python3 bokeh_server.py
import pandas as pd
from tornado.ioloop import IOLoop
import yaml
from jinja2 import Template
from bokeh.application.handlers import FunctionHandler
from bokeh.application import Application
from bokeh.layouts import column
from bokeh.models import ColumnDataSource, Slider, Div
from bokeh.plotting import figure
from bokeh.server.server import Server
from bokeh.themes import Theme
from bokeh.client import push_session
import os
# if running locally, listen on port 5000
PORT = int(os.getenv('PORT', '5000'))
HOST = "0.0.0.0"
try:
# This is set in the cloud foundry manifest. If we are running on
# cloud foundry, this will be set for us.
ALLOW_WEBSOCKET_ORIGIN = os.getenv("ALLOW_WEBSOCKET_ORIGIN").split(',')
except:
# We are not running on cloud foundry so we must be running locally
ALLOW_WEBSOCKET_ORIGIN = [ 'localhost:{0}'.format(PORT) ]
io_loop = IOLoop.current()
# This example simulates reading from a stream such as kafka
def modify_doc(doc):
df_all = pd.read_csv('data.csv')
df_all['Date'] = pd.to_datetime(df_all['Date'])
start_data = df_all[0:10].to_dict(orient='list')
source = ColumnDataSource(data=start_data)
plot = figure(x_axis_type='datetime',
y_range=(0, 10000000),
y_axis_label='Y Label',
title="Title")
plot.line('Date', 'ALL_EXCL_FUEL', color='blue', alpha=1, source=source)
plot.line('Date', 'MOSTLY_FOOD', color='lightblue', alpha=1, source=source)
plot.line('Date', 'NON_SPECIALISED', color='grey', alpha=1, source=source)
def callback():
# FIXME: how can we save this in the user's session?
global counter
try:
counter
except NameError:
counter = 10
df = df_all[counter:counter+1]
if df.shape[0] > 0:
# hardcode update values for now
new_data = df.to_dict(orient='list')
source.stream( new_data )
counter = counter + 1
doc.add_root(plot)
doc.add_periodic_callback(callback, 250)
bokeh_app = Application(FunctionHandler(modify_doc))
server = Server(
{'/': bokeh_app},
io_loop=io_loop,
allow_websocket_origin=ALLOW_WEBSOCKET_ORIGIN,
**{'port': PORT, 'address': HOST}
)
server.start()
if __name__ == '__main__':
io_loop.add_callback(server.show, "/")
io_loop.start()
file: data.csv
Date,ALL_EXCL_FUEL,MOSTLY_FOOD,NON_SPECIALISED,TEXTILE,HOUSEHOLD,OTHER,NON_STORE
1986 Jan,1883154,747432,163708,267774,261453,281699,161088
1986 Feb,1819796,773161,152656,223836,246502,275121,148519
1986 Mar,1912582,797104,169440,251438,249614,292348,152638
1986 Apr,1974419,809334,170540,275975,260086,299271,159213
1986 May,1948915,800193,170173,274979,251175,297655,154740
1986 Jun,2019114,821785,178366,295463,251507,311447,160546
1986 Jul,2051539,816033,184812,297969,269786,323187,159752
1986 Aug,2011746,804386,180911,297138,263427,310220,155665
1986 Sep,2046678,792943,181055,305350,280640,318368,168322
1986 Oct,2110669,810147,187728,308919,298637,325617,179621
1986 Nov,2315710,847794,231599,352009,332079,358077,194152
1986 Dec,2830206,970987,319570,490001,373714,469399,206536
1987 Jan,2032021,798562,172215,288186,288534,307900,176624
1987 Feb,1980748,805713,165682,247219,282836,313577,165721
1987 Mar,2009717,816051,174034,256756,280207,315562,167106
1987 Apr,2156967,862749,189729,308543,284440,336755,174751
1987 May,2075808,834375,175464,287515,280404,330093,167957
1987 Jun,2137092,844051,183014,304706,286522,345149,173651
1987 Jul,2208377,847098,198848,330804,301537,356037,174054
I performed some testing and found that every time a new browser session was opened with the bokeh chart url, a new bokeh Document
instance was created. The answer for me was to save the state in the document instance:
def modify_doc(doc):
# The first 100 records of data.csv will be loaded immediately
# The remaining records will be read one-by-one in the update
# callback which is used to simulate new, realtime data arriving
doc.realtime_rec_ptr = 100
df_all = pd.read_csv('data.csv')
df_all['Date'] = pd.to_datetime(df_all['Date'])
start_data_df = df_all[0:doc.realtime_rec_ptr]
start_data_df.loc[ :, 'color' ] = 'green'
src = ColumnDataSource(data=start_data_df.to_dict(orient='list'))
p = figure(x_axis_type='datetime', title="Title"
y_range=(0, 10000000), y_axis_label='Y Label')
p.line('Date','ALL_EXCL_FUEL',color='blue',alpha=1,source=src)
# realtime markers will be colored green, others will be blue
p.circle('Date','ALL_EXCL_FUEL',color='color',fill_alpha=0.2,size=4,source=src)
def callback():
df = df_all[doc.realtime_rec_ptr:realtime_rec_ptr+1]
if df.shape[0] > 0:
df.loc[ :, 'color' ] = 'blue'
new_data = df.to_dict(orient='list')
#print(new_data)
source.stream( new_data )
doc.realtime_rec_ptr = doc.realtime_rec_ptr + 1
doc.add_root(p)
doc.add_periodic_callback(callback, 250)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With