I'm trying to have logstash output to elasticsearch but I'm not sure how to use the mapping I defined in elasticsearch...
In Kibana, I did this:
Created an index and mapping like this:
PUT /kafkajmx2
{
"mappings": {
"kafka_mbeans": {
"properties": {
"@timestamp": {
"type": "date"
},
"@version": {
"type": "integer"
},
"host": {
"type": "keyword"
},
"metric_path": {
"type": "text"
},
"type": {
"type": "keyword"
},
"path": {
"type": "text"
},
"metric_value_string": {
"type": "keyword"
},
"metric_value_number": {
"type": "float"
}
}
}
}
}
Can write data to it like this:
POST /kafkajmx2/kafka_mbeans
{
"metric_value_number":159.03478490788203,
"path":"/home/usrxxx/logstash-5.2.0/bin/jmxconf",
"@timestamp":"2017-02-12T23:08:40.934Z",
"@version":"1","host":"localhost",
"metric_path":"node1.kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec.FifteenMinuteRate",
"type":null
}
now my logstash output looks like this:
input {
kafka {
kafka details here
}
}
output {
elasticsearch {
hosts => "http://elasticsearch:9050"
index => "kafkajmx2"
}
}
and it just writes it to the kafkajmx2
index but doesn't use the map, when I query it like this in kibana:
get /kafkajmx2/kafka_mbeans/_search?q=*
{
}
I get this back:
{
"_index": "kafkajmx2",
"_type": "logs",
"_id": "AVo34xF_j-lM6k7wBavd",
"_score": 1,
"_source": {
"@timestamp": "2017-02-13T14:31:53.337Z",
"@version": "1",
"message": """
{"metric_value_number":0,"path":"/home/usrxxx/logstash-5.2.0/bin/jmxconf","@timestamp":"2017-02-13T14:31:52.654Z","@version":"1","host":"localhost","metric_path":"node1.kafka.server:type=SessionExpireListener,name=ZooKeeperAuthFailuresPerSec.Count","type":null}
"""
}
}
how do I tell it to use the map kafka_mbeans
in the logstash output?
-----EDIT-----
I tried my output like this but still get the same results:
output {
elasticsearch {
hosts => "http://10.204.93.209:9050"
index => "kafkajmx2"
template_name => "kafka_mbeans"
codec => plain {
format => "%{message}"
}
}
}
the data in elastic search should look like this:
{
"@timestamp": "2017-02-13T14:31:52.654Z",
"@version": "1",
"host": "localhost",
"metric_path": "node1.kafka.server:type=SessionExpireListener,name=ZooKeeperAuthFailuresPerSec.Count",
"metric_value_number": 0,
"path": "/home/usrxxx/logstash-5.2.0/bin/jmxconf",
"type": null
}
--------EDIT 2--------------
I atleast got the message to parse into json by adding a filter like this:
input {
kafka {
...kafka details....
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
hosts => "http://node1:9050"
index => "kafkajmx2"
template_name => "kafka_mbeans"
}
}
It doesn't use the template still but this atleast parses the json correctly...so now I get this:
{
"_index": "kafkajmx2",
"_type": "logs",
"_id": "AVo4a2Hzj-lM6k7wBcMS",
"_score": 1,
"_source": {
"metric_value_number": 0.9967205071482902,
"path": "/home/usrxxx/logstash-5.2.0/bin/jmxconf",
"@timestamp": "2017-02-13T16:54:16.701Z",
"@version": "1",
"host": "localhost",
"metric_path": "kafka1.kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent.Value",
"type": null
}
}
What you need to change is very simple. First use the json
codec in your kafka
input. No need for the json
filter, you can remove it.
kafka {
...kafka details....
codec => "json"
}
Then in your elasticsearch
output you're missing the mapping type (parameter document_type
below), which is important otherwise it defaults to logs
(as you can see) and that doesn't match your kafka_mbeans
mapping type. Moreover, you don't really need to use template since your index already exists. Make the following modification:
elasticsearch {
hosts => "http://node1:9050"
index => "kafkajmx2"
document_type => "kafka_mbeans"
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With