Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Logstash: Merge two logs into one output document

I have set syslog to send logs to logstash, with the following filters:

output {
  elasticsearch 
  { hosts => ["localhost:9200"]
  document_id => "%{job_id}"   
}

}
filter {
    grok {
        overwrite => ["message"]
    }
    json {
     source => "message"
    }
}

A typical message of one of my application will have an initial state and a job_id:

{"job_id": "xyz782", state: "processing", job_type: "something"}

A few minutes or so later, another log will have the same log_id, a different state, and a processing time:

{"job_id": "xyz782", state:"failed", processing_time: 12.345}

These fields get properly loaded, but two documents are created. What I would love is for only one document to be created for the initial log, and the second log to instead update the first one, meaning the updated document would have the following fields:

{"job_id": "xyz782", state: "failed", job_type: "something", processing_time: 12.345}

As you can see in my logstash conf output, I use the job_id as the document id, however, the second message seems to replace the fields from the first message, but also erase all the fields in the first message that aren't in the second one, for instance, the job_type field present in the first message doesn't appear in the final document. This may have to do with the fact the json comes from the same field "message" both times. Is there another way to get the merging of two logs messages into one document in logstash?

like image 436
Loic Duros Avatar asked Feb 04 '16 14:02

Loic Duros


1 Answers

You can use the aggregate filter in order to do this. The aggregate filter provides support for aggregating several log lines into one single event based on a common field value. In your case, the common field would be the job_id field.

Then we need another field to detect the first event vs the second event that should be aggregated. In your case, this would be the state field.

So you simply need to add another filter to your existing Logstash configuration, like this:

filter {
    ...your other filters

    if [state] == "processing" {
        aggregate {
            task_id => "%{job_id}"
        }
    } else if [state] == "failed" {
        aggregate {
            task_id => "%{job_id}"
            end_of_task => true
            timeout => 120
        }
    }
}

You are free to adjust the timeout (in seconds) depending on how long your jobs are running.

like image 155
Val Avatar answered Oct 19 '22 19:10

Val