Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Updating complex nested elasticsearch document using logstash and jdbc

Let's assume that the Oracle Schema has following tables and columns:

        country_id; (Primary Key)

        department_id; (Primary Key)
        country_id; (Foreign key to Country:country_id)

        employee_id; (Primary Key)
        department_id; (Foreign key to Department:department_id)

And I have my Elasticsearch document where the root element is a Country and it contains all Departments in that Country which in turn contain all Employees in respective Departments.

So the document structure looks like this:

      "mappings": {
        "country": {
          "properties": {
            "country_id": { "type": "string"},
            "country_name": { "type": "string"},        
            "department": {
              "type": "nested",
              "properties": {
                "department_id": { "type": "string"},
                "department_name": { "type": "string"},
                "employee": {
                  "type": "nested",
                  "properties": {
                    "employee_id": { "type": "string"},
                    "employee_name": { "type": "string"}

I want to be able to have separate input jdbc queries running on each table and they should create/update/delete data in the elasticsearch document whenever the data in the base table are added/updated/deleted.

This is an example problem and actual tables and data structure are more complex. So I am not looking for solution limited to this.

Is there a way to achieve this?


like image 298
PKJ Avatar asked Jan 19 '16 14:01


1 Answers

For level one, its straight forward using aggregate filter. You need to have a common id between them to reference.

filter {    

  aggregate {
    task_id => "%{id}"

    code => "     
      map['id'] = event.get('id')
      map['department'] ||= []
      map['department'] << event.to_hash.each do |key,value| { key => value } end    
    push_previous_map_as_event => true
    timeout => 150000
    timeout_tags => ['aggregated']    

   if "aggregated" not in [tags] {
    drop {}

Important : The output action should be update

    output {
        elasticsearch {
            action => "update"

One way to solve level 2 is to query the already indexed document and update it with the nested record. Again using aggregate filter; there should be a common id for the document so you can lookup and insert into the correct document.

filter {    
    #get the document from elastic based on id and store it in 'emp'
    elasticsearch {
            hosts => ["${ELASTICSEARCH_HOST}/${INDEX_NAME}/${INDEX_TYPE}"]
            query => "id:%{id}" 
            fields => { "employee" => "emp" }

  aggregate {
    task_id => "%{id}"  
    code => "       
                map['id'] = event.get('id')
                map['employee'] = []
                employeeArr = []
                temp_emp = {}   

                event.to_hash.each do |key,value|                       
                    temp_emp[key] = value

                #push the objects into an array

                empArr = event.get('emp')                   

                for emp in empArr
                    emp['employee'] = employeeArr                       
    push_previous_map_as_event => true
    timeout => 150000
    timeout_tags => ['aggregated']


   if "aggregated" not in [tags] {
    drop {}


output {

elasticsearch {
        action => "update"    #important

Also, in order to debug the ruby code, use the below in the output

    stdout { codec => dots }
like image 165
Polynomial Proton Avatar answered Nov 11 '22 20:11

Polynomial Proton