I'm quite unclear of what sql_last_value
does when I give my statement as such:
statement => "SELECT * from mytable where id > :sql_last_value"
I can slightly understand the reason behind using it, where it doesn't browse through the whole db table in order to update fields instead it only updates the records which were added newly. Correct me if I'm wrong.
So what I'm trying to do is, creating the index using logstash
as such:
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://hostmachine:3306/db"
jdbc_user => "root"
jdbc_password => "root"
jdbc_validate_connection => true
jdbc_driver_library => "/path/mysql_jar/mysql-connector-java-5.1.39-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
schedule => "* * * * *"
statement => "SELECT * from mytable where id > :sql_last_value"
use_column_value => true
tracking_column => id
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
}
}
output {
elasticsearch {
#protocol => http
index => "myindex"
document_type => "message_logs"
document_id => "%{id}"
action => index
hosts => ["http://myhostmachine:9402"]
}
}
Once I do this, the docs aren't getting uploaded at all to the index. Where am I going wrong?
Any help could be appreciated.
In simple words, sql_last_value allows you to persist data from your last sql run as its name sugets. This value is specially useful when you schedule your query.
Each time Logstash polls MySQL, it stores the update or insertion time of the last record that it has read from MySQL.
If you have a timestamp column in your table (e.g. last_updated
), you should preferably use it instead of the ID one. So that when a record gets updated, you modify that timestamp as well and the jdbc
input plugin will pick up the record (i.e. the ID column won't change its value and the updated record won't get picked up)
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://hostmachine:3306/db"
jdbc_user => "root"
jdbc_password => "root"
jdbc_validate_connection => true
jdbc_driver_library => "/path/mysql_jar/mysql-connector-java-5.1.39-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
schedule => "* * * * *"
statement => "SELECT * from mytable where last_updated > :sql_last_value"
}
}
If you decide to stay with the ID column nonetheless, you should delete the $HOME/.logstash_jdbc_last_run
file and try again.
There are a few things to take care of:
If you have run Logstash earlier without the schedule, then before running Logstash with schedule, delete the file:
$HOME/.logstash_jdbc_last_run
In Windows, this file is found at:
C:\Users\<Username>\.logstash_jdbc_last_run
The "statement =>" in Logstash config should have "order by" the tracking_column.
tracking_column should be given correctly.
Here is an example of the Logstash config file:
input {
jdbc {
# MySQL DB jdbc connection string to our database, softwaredevelopercentral
jdbc_connection_string => "jdbc:mysql://localhost:3306/softwaredevelopercentral?autoReconnect=true&useSSL=false"
# The user we wish to execute our statement as
jdbc_user => "root"
# The user password
jdbc_password => ""
# The path to our downloaded jdbc driver
jdbc_driver_library => "D:\Programs\MySQLJava\mysql-connector-java-6.0.6.jar"
# The name of the driver class for MySQL DB
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# our query
schedule => "* * * * *"
statement => "SELECT * FROM student WHERE studentid > :sql_last_value order by studentid"
use_column_value => true
tracking_column => "studentid"
}
}
output {
stdout { codec => json_lines }
elasticsearch {
hosts => ["localhost:9200"]
index => "students"
document_type => "student"
document_id => "%{studentid}"
}
}
To see a working example of the same you can check my blog post: http://softwaredevelopercentral.blogspot.com/2017/10/elasticsearch-logstash-kibana-tutorial.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With