Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Save Websocket Data to Pandas

I'm trying to consume and save websocket data to a pandas DataFrame for other functions to use. However I'm very unfamiliar with them and originally got errors because I was trying to directly pass a df argument to on_message(). This link suggests using partial to add arguments but I am still getting a error from callback <function on_message at 0x000002A098207510>: local variable 'df' referenced before assignment error.

I realise there are better ways to process the data rather than a dataframe but I'd just like to get that working first. Thanks.

import websocket
import pandas as pd
import json
from functools import partial

# create empty df
df = pd.DataFrame(columns=['foreignNotional','grossValue','homeNotional','price','side','size','symbol','tickDirection',
                          'timestamp','trdMatchID'])

def on_message(ws, message):
    msg = json.loads(message)
    print(msg)
    df = df
    df = df.append(msg)

def on_error(ws, error):
    print(error)

def on_close(ws):
    print("### closed ###")

def on_open(ws):
    return

func = partial(on_message, df)
ws.on_message = func

if __name__ == "__main__":
    #websocket.enableTrace(True)
    ws = websocket.WebSocketApp("wss://www.bitmex.com/realtime?subscribe=trade:XBTUSD",
                                on_message=on_message,
                                on_error=on_error,
                                on_close=on_close)
    ws.on_open = on_open
    ws.run_forever()
like image 990
swifty Avatar asked Mar 07 '23 14:03

swifty


1 Answers

Since you have to modify an external object, and pandas.DataFrame.append does not allow in-place modification, you have to either make all function and df available as members of some class, so you would be able to write self.df = self.df.append(msg), or use global:

import json

import pandas as pd
import websocket

df = pd.DataFrame(columns=['foreignNotional', 'grossValue', 'homeNotional', 'price', 'side',
                           'size', 'symbol', 'tickDirection', 'timestamp', 'trdMatchID'])


def on_message(ws, message):
    msg = json.loads(message)
    print(msg)
    global df
    # `ignore_index=True` has to be provided, otherwise you'll get
    # "Can only append a Series if ignore_index=True or if the Series has a name" errors
    df = df.append(msg, ignore_index=True)


def on_error(ws, error):
    print(error)


def on_close(ws):
    print("### closed ###")


def on_open(ws):
    return


if __name__ == "__main__":
    ws = websocket.WebSocketApp("wss://www.bitmex.com/realtime?subscribe=trade:XBTUSD",
                                on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close)
    ws.run_forever()
like image 79
Eugene Pakhomov Avatar answered Mar 29 '23 07:03

Eugene Pakhomov