I'm migrating my financial analysis application data from MongoDB to InfluxDB because the data and the analysis is growing exponentially.
My current scenario is:
1) Get the tick every second from the exchanges and store it in a measurement called 'tick';
2) Have a continuous query running every 10 seconds grouping this 'tick' data by minute into a measurement called 'ohlc' (candlestick data);
And here's come my doubts.. When i was using Mongo as my database, in the moment that i get the ticks i already transform it in candlestick data and calculate some indicators (MACD, EMA, BB, RSI) and store it.
I see that InfluxDB has Kapacitor as it data processor, there's a way to write some scripts in Kapacitor to calculate this indicators or should i stream the data to NodeJS and calculate it myself?
If i have to stream the data, what is the best practices to do it?
There are a few options when you're using InfluxDB. With Kapacitor, you can incorporate user-defined functions in any language that has protocol buffer support or you can write a TICKscript to do the data transformation.
You can also use the Continuous Queries feature of the database, although they can sometimes be expensive queries depending on the queries and the intervals.
If you want to write your own function in NodeJS, you basically just write some code that listens on a unix domain socket, Kapacitor connects to that socket, and data can then be written over that socket connection (full docs here).
If you want to write a TICKscript, here are a couple examples:
// {alert_name}
// metric: {alert_metric}
// available_fields: [[other_telegraf_fields]]
// TELEGRAF CONFIGURATION
// [inputs.{plugin}]
// # full configuration
// DEFINE: kapacitor define {alert_name} -type batch -tick
//{plugin}/{alert_name}.tick -dbrp telegraf.autogen
// ENABLE: kapacitor enable {alert_name}
// Parameters
var info = {info_level}
var warn = {warn_level}
var crit = {crit_level}
var infoSig = 2.5
var warnSig = 3
var critSig = 3.5
var period = 10s
var every = 10s
// Dataframe
var data = stream
|from()
.database('telegraf')
.retentionPolicy('autogen')
.measurement({plugin})
.groupBy('host')
|window()
.period(period)
.every(every)
|mean({alert_metric})
.as("stat")
// Thresholds
var alert = data
|eval(lambda: sigma("stat"))
.as('sigma')
.keep()
|alert()
.id('{{ index .Tags "host"}}/{alert_metric}')
.message('{{ .ID }}:{{ index .Fields "stat" }}')
.info(lambda: "stat" > info OR "sigma" > infoSig)
.warn(lambda: "stat" > warn OR "sigma" > warnSig)
.crit(lambda: "stat" > crit OR "sigma" > critSig)
// Alert
alert
.log('/tmp/{alert_name}_log.txt')
I hope that helps!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With