Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to store and retrieve state in a callback()?

Tags:

bokeh

I'm trying to simulate reading data from a queue such as kafka. I need to maintain a pointer to the current record when I am streaming data from a file. Currently I'm doing this with a global variable, but this appears to be shared across all user sessions.

How can I save this user state in bokeh?

def modify_doc(doc):

    df_all = pd.read_csv('data.csv')
    df_all['Date'] = pd.to_datetime(df_all['Date'])

    start_data = df_all[0:10].to_dict(orient='list')
    source = ColumnDataSource(data=start_data)
    ...

    def callback():

        # FIXME: how can we save the current_record in the user's session?

        global current_record
        try:
            current_record 
        except NameError:
            current_record = 10

        df = df_all[current_record:current_record+1]

        if df.shape[0] > 0:
            # we have another record so display it
            new_data = df.to_dict(orient='list')
            source.stream( new_data )
            current_record = current_record + 1

    doc.add_root(plot)
    doc.add_periodic_callback(callback, 250)

I've seen the documentation for ClientSession, but this appears to work at the whole document level?


I've included a Minimal, Complete and Verifiable example below:

file: bokeh_server.py

Run locally with: python3 bokeh_server.py

import pandas as pd
from tornado.ioloop import IOLoop
import yaml
from jinja2 import Template

from bokeh.application.handlers import FunctionHandler
from bokeh.application import Application
from bokeh.layouts import column
from bokeh.models import ColumnDataSource, Slider, Div
from bokeh.plotting import figure
from bokeh.server.server import Server
from bokeh.themes import Theme
from bokeh.client import push_session

import os

# if running locally, listen on port 5000
PORT = int(os.getenv('PORT', '5000'))
HOST = "0.0.0.0"

try:
    # This is set in the cloud foundry manifest. If we are running on 
    # cloud foundry, this will be set for us.
    ALLOW_WEBSOCKET_ORIGIN = os.getenv("ALLOW_WEBSOCKET_ORIGIN").split(',')
except:
    # We are not running on cloud foundry so we must be running locally
    ALLOW_WEBSOCKET_ORIGIN = [ 'localhost:{0}'.format(PORT) ]


io_loop = IOLoop.current()

# This example simulates reading from a stream such as kafka

def modify_doc(doc):

    df_all = pd.read_csv('data.csv')
    df_all['Date'] = pd.to_datetime(df_all['Date'])

    start_data = df_all[0:10].to_dict(orient='list')

    source = ColumnDataSource(data=start_data)

    plot = figure(x_axis_type='datetime', 
                  y_range=(0, 10000000), 
                  y_axis_label='Y Label',
                  title="Title")

    plot.line('Date', 'ALL_EXCL_FUEL',   color='blue',      alpha=1, source=source)
    plot.line('Date', 'MOSTLY_FOOD',     color='lightblue', alpha=1, source=source)
    plot.line('Date', 'NON_SPECIALISED', color='grey',      alpha=1, source=source)

    def callback():
        # FIXME: how can we save this in the user's session?
        global counter
        try:
           counter 
        except NameError:
            counter = 10

        df = df_all[counter:counter+1]

        if df.shape[0] > 0:
            # hardcode update values for now
            new_data = df.to_dict(orient='list')
            source.stream( new_data )
            counter = counter + 1

    doc.add_root(plot)
    doc.add_periodic_callback(callback, 250)


bokeh_app = Application(FunctionHandler(modify_doc))

server = Server(
        {'/': bokeh_app}, 
        io_loop=io_loop,
        allow_websocket_origin=ALLOW_WEBSOCKET_ORIGIN,
        **{'port': PORT, 'address': HOST}
        )
server.start()

if __name__ == '__main__':
    io_loop.add_callback(server.show, "/")
    io_loop.start()

file: data.csv

Date,ALL_EXCL_FUEL,MOSTLY_FOOD,NON_SPECIALISED,TEXTILE,HOUSEHOLD,OTHER,NON_STORE
1986 Jan,1883154,747432,163708,267774,261453,281699,161088
1986 Feb,1819796,773161,152656,223836,246502,275121,148519
1986 Mar,1912582,797104,169440,251438,249614,292348,152638
1986 Apr,1974419,809334,170540,275975,260086,299271,159213
1986 May,1948915,800193,170173,274979,251175,297655,154740
1986 Jun,2019114,821785,178366,295463,251507,311447,160546
1986 Jul,2051539,816033,184812,297969,269786,323187,159752
1986 Aug,2011746,804386,180911,297138,263427,310220,155665
1986 Sep,2046678,792943,181055,305350,280640,318368,168322
1986 Oct,2110669,810147,187728,308919,298637,325617,179621
1986 Nov,2315710,847794,231599,352009,332079,358077,194152
1986 Dec,2830206,970987,319570,490001,373714,469399,206536
1987 Jan,2032021,798562,172215,288186,288534,307900,176624
1987 Feb,1980748,805713,165682,247219,282836,313577,165721
1987 Mar,2009717,816051,174034,256756,280207,315562,167106
1987 Apr,2156967,862749,189729,308543,284440,336755,174751
1987 May,2075808,834375,175464,287515,280404,330093,167957
1987 Jun,2137092,844051,183014,304706,286522,345149,173651
1987 Jul,2208377,847098,198848,330804,301537,356037,174054
like image 477
Chris Snow Avatar asked Jun 30 '17 07:06

Chris Snow


1 Answers

I performed some testing and found that every time a new browser session was opened with the bokeh chart url, a new bokeh Document instance was created. The answer for me was to save the state in the document instance:

def modify_doc(doc):

    # The first 100 records of data.csv will be loaded immediately
    # The remaining records will be read one-by-one in the update
    # callback which is used to simulate new, realtime data arriving

    doc.realtime_rec_ptr = 100

    df_all = pd.read_csv('data.csv')
    df_all['Date'] = pd.to_datetime(df_all['Date'])

    start_data_df = df_all[0:doc.realtime_rec_ptr]
    start_data_df.loc[ :, 'color' ] = 'green'

    src = ColumnDataSource(data=start_data_df.to_dict(orient='list'))

    p = figure(x_axis_type='datetime', title="Title"
                  y_range=(0, 10000000), y_axis_label='Y Label')

    p.line('Date','ALL_EXCL_FUEL',color='blue',alpha=1,source=src)

    # realtime markers will be colored green, others will be blue 
    p.circle('Date','ALL_EXCL_FUEL',color='color',fill_alpha=0.2,size=4,source=src)

    def callback():
        df = df_all[doc.realtime_rec_ptr:realtime_rec_ptr+1]

        if df.shape[0] > 0:
            df.loc[ :, 'color' ] = 'blue'
            new_data = df.to_dict(orient='list')
            #print(new_data)
            source.stream( new_data )
            doc.realtime_rec_ptr = doc.realtime_rec_ptr + 1

    doc.add_root(p)
    doc.add_periodic_callback(callback, 250)
like image 133
Chris Snow Avatar answered Oct 12 '22 23:10

Chris Snow