Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to index nested mysql object into elasticsearch using logstash?

I'm trying to index mysql database with elasticsearch. Consider the example mapping:

{"blog":
  {"properties": 
    {"id": "string"}
    {"author": "string"}
    {"time_created": }
    {"author_info": 
      {"author_name":}
      {"author_sex":}
    }
    {"posts":
      {"post_author":}
      {"post_time":}
    }
  }
}

I have three tables which are author_info, blog and post. How can I index these records into elastic with a nested structure? I cannot find documents about it. Thanks

like image 995
Yangrui Avatar asked Jan 28 '26 15:01

Yangrui


1 Answers

    input {
    jdbc{
        jdbc_validate_connection => true
        jdbc_connection_string => "jdbc:mysql://172.17.0.2:3306/_db"
        jdbc_user => "root"
        jdbc_password => "admin"
        jdbc_driver_library => "/home/ilsa/mysql-connector-java-5.1.36-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        clean_run => true   
        statement => "SELECT  
            u.id as employee_number, u.email as email, u.username as username, 
            up.id as post_id, up.text_content as content, 
            pc.id as comment_id , pc.user_post_id as comment_post_id, pc.comment as comment_text 
            FROM users u join user_posts up on up.user_id = u.id 
            LEFT JOIN  post_comments pc ON pc.user_post_id = up.id 
            ORDER BY up.id ASC"
    }

}
filter {
    aggregate {
        task_id => "%{employee_number}"
        code => "
            map['employee_number'] = event.get('employee_number')
            map['email'] = event.get('email')
            map['username'] = event.get('username')
            map['posts'] ||= []
            map['posts'] << {

                'post_id' => event.get('post_id'),
                'content' => event.get('content'),
                'comments' => [] << { 
                    'comment_id' => event.get('comment_id'),
                    'comment_post_id' => event.get('comment_post_id'),
                    'comment_text' => event.get('comment_text')
                }

            }
        event.cancel()"
        push_previous_map_as_event => true
        timeout => 30
    }
}
output {
    stdout{ codec => rubydebug }
    elasticsearch{
        action => "index"
        index => "_dev"
        document_type => "_doc"
        document_id => "%{employee_number}"
        hosts => "localhost:9200"
    }

}
like image 148
Saif Avatar answered Jan 30 '26 06:01

Saif