We're using Serilog HTTP sink to send the messages to Logstash. But the HTTP message body is like this:
{
"events": [
{
"Timestamp": "2016-11-03T00:09:11.4899425+01:00",
"Level": "Debug",
"MessageTemplate": "Logging {@Heartbeat} from {Computer}",
"RenderedMessage": "Logging { UserName: \"Mike\", UserDomainName: \"Home\" } from \"Workstation\"",
"Properties": {
"Heartbeat": {
"UserName": "Mike",
"UserDomainName": "Home"
},
"Computer": "Workstation"
}
},
{
"Timestamp": "2016-11-03T00:09:12.4905685+01:00",
"Level": "Debug",
"MessageTemplate": "Logging {@Heartbeat} from {Computer}",
"RenderedMessage": "Logging { UserName: \"Mike\", UserDomainName: \"Home\" } from \"Workstation\"",
"Properties": {
"Heartbeat": {
"UserName": "Mike",
"UserDomainName": "Home"
},
"Computer": "Workstation"
}
}
]
}
ie. the logging events are batched in an array. It is possible to send the messages one by one, but it's still a one-item array then.
The event is then displayed in Kibana as having field message
with value
{
"events": [
{
// ...
},
{
// ...
}
]
}
ie. literally what came from the HTTP input.
How can I split the items in the events
array to individual logging events and "pull up" the properties to the top level so that I would have two logging events in ElasticSearch:
"Timestamp": "2016-11-03T00:09:11.4899425+01:00",
"Level": "Debug",
"MessageTemplate": "Logging {@Heartbeat} from {Computer}",
"RenderedMessage": "Logging { UserName: \"Mike\", UserDomainName: \"Home\" } from \"Workstation\"",
"Properties": {
"Heartbeat": {
"UserName": "Mike",
"UserDomainName": "Home"
},
"Computer": "Workstation"
}
"Timestamp": "2016-11-03T00:09:12.4905685+01:00",
"Level": "Debug",
"MessageTemplate": "Logging {@Heartbeat} from {Computer}",
"RenderedMessage": "Logging { UserName: \"Mike\", UserDomainName: \"Home\" } from \"Workstation\"",
"Properties": {
"Heartbeat": {
"UserName": "Mike",
"UserDomainName": "Home"
},
"Computer": "Workstation"
}
I tried Logstash json and split, but I can't make it work.
You can achieve what you expect using an additional ruby
filter to pull up the fields from the sub-structure:
filter {
split {
field => "events"
}
ruby {
code => "
event.to_hash.update(event['events'].to_hash)
event.to_hash.delete_if {|k, v| k == 'events'}
"
}
}
The resulting event will look like this:
{
"@version" => "1",
"@timestamp" => "2017-01-20T04:51:39.223Z",
"host" => "iMac.local",
"Timestamp" => "2016-11-03T00:09:12.4905685+01:00",
"Level" => "Debug",
"MessageTemplate" => "Logging {@Heartbeat} from {Computer}",
"RenderedMessage" => "Logging { UserName: \"Mike\", UserDomainName: \"Home\" } from \"Workstation\"",
"Properties" => {
"Heartbeat" => {
"UserName" => "Mike",
"UserDomainName" => "Home"
},
"Computer" => "Workstation"
}
}
After upgrading to Logstash 5.0 Val's solution stopped working due to a change in the Event API: updating event.to_hash
was not reflected in the original event
. For Logstash 5.0+ event.get('field')
and event.set('field', value)
accessors must be used.
The updated solution is now:
input {
http {
port => 8080
codec => json
}
}
filter {
split {
field => "events"
}
ruby {
code => "
event.get('events').each do |k, v|
event.set(k, v)
end
"
}
mutate {
remove_field => [ "events" ]
}
}
You can now achieve this by setting batchFormatter. Default batch formatter will create faulty events, but ArrayBatchFormatter will fix this:
logger.WriteTo.DurableHttpUsingFileSizeRolledBuffers(
requestUri: new Uri($"http://{elasticHost}:{elasticPort}").ToString(),
batchFormatter: new ArrayBatchFormatter());
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With