Let's assume that the Oracle Schema has following tables and columns:
Country
country_id; (Primary Key)
country_name;
Department
department_id; (Primary Key)
department_name;
country_id; (Foreign key to Country:country_id)
Employee
employee_id; (Primary Key)
employee_name;
department_id; (Foreign key to Department:department_id)
And I have my Elasticsearch document where the root element is a Country and it contains all Departments in that Country which in turn contain all Employees in respective Departments.
So the document structure looks like this:
{
"mappings": {
"country": {
"properties": {
"country_id": { "type": "string"},
"country_name": { "type": "string"},
"department": {
"type": "nested",
"properties": {
"department_id": { "type": "string"},
"department_name": { "type": "string"},
"employee": {
"type": "nested",
"properties": {
"employee_id": { "type": "string"},
"employee_name": { "type": "string"}
}
}
}
}
}
}
}
}
I want to be able to have separate input jdbc queries running on each table and they should create/update/delete data in the elasticsearch document whenever the data in the base table are added/updated/deleted.
This is an example problem and actual tables and data structure are more complex. So I am not looking for solution limited to this.
Is there a way to achieve this?
Thanks.
For level one, its straight forward using aggregate filter. You need to have a common id between them to reference.
filter {
aggregate {
task_id => "%{id}"
code => "
map['id'] = event.get('id')
map['department'] ||= []
map['department'] << event.to_hash.each do |key,value| { key => value } end
"
push_previous_map_as_event => true
timeout => 150000
timeout_tags => ['aggregated']
}
if "aggregated" not in [tags] {
drop {}
}
}
Important : The output action should be update
output {
elasticsearch {
action => "update"
...
}
}
One way to solve level 2 is to query the already indexed document and update it with the nested record. Again using aggregate filter; there should be a common id for the document so you can lookup and insert into the correct document.
filter {
#get the document from elastic based on id and store it in 'emp'
elasticsearch {
hosts => ["${ELASTICSEARCH_HOST}/${INDEX_NAME}/${INDEX_TYPE}"]
query => "id:%{id}"
fields => { "employee" => "emp" }
}
aggregate {
task_id => "%{id}"
code => "
map['id'] = event.get('id')
map['employee'] = []
employeeArr = []
temp_emp = {}
event.to_hash.each do |key,value|
temp_emp[key] = value
end
#push the objects into an array
employeeArr.push(temp_emp)
empArr = event.get('emp')
for emp in empArr
emp['employee'] = employeeArr
map['employee'].push(emp)
end
"
push_previous_map_as_event => true
timeout => 150000
timeout_tags => ['aggregated']
}
if "aggregated" not in [tags] {
drop {}
}
}
output {
elasticsearch {
action => "update" #important
...
}
}
Also, in order to debug the ruby code, use the below in the output
output{
stdout { codec => dots }
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With