We are storing small documents in ES that represent a sequence of events for an object. Each event has a date/time stamp. We need to analyze the time between events for all objects over a period of time.
For example, imagine these event json documents:
{ "object":"one", "event":"start", "datetime":"2016-02-09 11:23:01" }
{ "object":"one", "event":"stop", "datetime":"2016-02-09 11:25:01" }
{ "object":"two", "event":"start", "datetime":"2016-01-02 11:23:01" }
{ "object":"two", "event":"stop", "datetime":"2016-01-02 11:24:01" }
What we would want to get out of this is a histogram plotting the two resulting time stamp deltas (from start to stop): 2 minutes / 120 seconds for object one and 1 minute / 60 seconds for object two.
Ultimately we want to monitor the time between start and stop events but it requires that we calculate the time between those events then aggregate them or provide them to the Kibana UI to be aggregated / plotted. Ideally we would like to feed the results directly to Kibana so we can avoid creating any custom UI.
Thanks in advance for any ideas or suggestions.
Since you're open to use Logstash, there's a way to do it using the aggregate
filter
Note that this is a community plugin that needs to be installed first. (i.e. it doesn't ship with Logstash by default)
The main idea of the aggregate
filter is to merge two "related" log lines. You can configure the plugin so it knows what "related" means. In your case, "related" means that both events must share the same object
name (i.e. one
or two
) and then that the first event has its event
field with the start
value and the second event has its event
field with the stop
value.
When the filter encounters the start
event, it stores the datetime
field of that event in an internal map. When it encounters the stop
event, it computes the time difference between the two datetimes and stores the duration in seconds in the new duration
field.
input {
...
}
filter {
...other filters
if [event] == "start" {
aggregate {
task_id => "%{object}"
code => "map['start'] = event['datetime']"
map_action => "create"
}
} else if [event] == "stop" {
aggregate {
task_id => "%{object}"
code => "map['duration'] = event['datetime'] - map['start']"
end_of_task => true
timeout => 120
}
}
}
output {
elasticsearch {
...
}
}
Note that you can adjust the timeout
value (here 120 seconds) to better suit your needs. When the timeout has elapsed and no stop event has happened yet, the existing start event will be ditched.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With