I am trying to create a data pipeline where Logstash jdbc plugin get some data with SQL query every 5 minutes and ElasticSearch output plugin puts data from the input plugin into ElasticSearch server. I want this output plugin to partial-updates existing document in ElasticSearch server. my Logstash configuration file looks like:
input {
jdbc {
jdbc_driver_library => "/Users/hello/logstash-2.3.2/lib/mysql-connector-java-5.1.34.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:13306/mysqlDB”
jdbc_user => “root”
jdbc_password => “1234”
last_run_metadata_path => "/Users/hello/.logstash_last_run_display"
statement => "SELECT * FROM checkout WHERE checkout_no between :sql_last_value + 1 and :sql_last_value + 5 ORDER BY checkout_no ASC"
schedule => “*/5 * * * *"
use_column_value => true
tracking_column => “checkout_no”
}
}
output {
stdout { codec => json_lines }
elasticsearch {
action => "update"
index => "ecs"
document_type => “checkout”
document_id => “%{checkout_no}"
hosts => ["localhost:9200"]
}
}
the problem is that ElasticSearch output plugin appears not to call partial update API such as /{index}/{type}/{id}/_update. the manual just lists actions such as index
, delete
, create
, update
, But it doesn’t mention each action calls which REST API URL, i.e) Whether update
action calls /{index}/{type}/{id}/_update or /{index}/{type}/{id} api (upsert). I would like to call partial update api from elastic search output plugin? Is it possible?
set both doc_as_upsert => true
and action => "update"
works in my production script.
output {
elasticsearch {
hosts => ["es_host"]
document_id => "%{id}" # !!! the id here MUST be the same
index => "logstash-my-index"
timeout => 30
workers => 1
doc_as_upsert => true
action => "update"
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With