Is it possible to do python like scripting in logstash? I can import the csv data into elasticsearch using logstash. But I need to use update API instead of simply indexing all rows.
Here is my sample csv file...
vi /tmp/head.txt
"Home","Home-66497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/1VSZj","919359000000","HMSHOP","916265100000","2016-05-18 08:41:49"
"Home","Home-26497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/1V1","919359000001","HMSHOP","916265100000","2016-05-18 18:41:49"
"Home","Home-36497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/SZj1","919359000001","HMSHOP","916265100000","2016-05-18 12:41:49"
"Home","Home-46497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/1","919359000000","HMSHOP","916265100000","2016-05-18 14:41:49"
"Home","Home-56497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/1VSZj1xc","919359000000","HMSHOP","916265100000","2016-05-18 16:41:49"
Here is logstash config file...
vi logstash.conf
input {
file {
path => "/tmp/head.txt"
type => "csv"
start_position => beginning
}
}
filter {
csv {
columns => ["user", "messageid", "message", "destination", "code", "mobile", "mytimestamp"]
separator => ","
}
}
output {
elasticsearch {
action => "index"
hosts => ["172.17.0.1"]
index => "logstash-%{+YYYY.MM.dd}"
workers => 1
}
}
I have confirmed that the above configuration is working as expected and all 5 records are stored as 5 separate documents.
here is my docker command...
docker run -d -v "/tmp/logstash.conf":/usr/local/logstash/config/logstash.conf -v /tmp/:/tmp/ logstash -f /usr/local/logstash/config/logstash.conf
The problem is that I need to merge the documents based on destination number. The destination should be the ID of the document. There are some rows with the same destination. For e.g. _id: 919359000001 This document should have both the following records as nested objects.
"user": "Home", "messageid": "Home-26497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V1", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 18:41:49"
"user": "Home", "messageid" "Home-36497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/SZj1", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp": "2016-05-18 12:41:49"
Elasticsearch is correctly converting the csv data to json as shown above. What I need is to reformat the statement to take advantage of scripting using update API The following code is working correctly.
POST /test_index/doc/_bulk
{ "update" : { "_id" : "919359000001"} }
{ "script" : { "inline": "ctx._source.parent += ['user': user, 'messageid': messageid, 'message': message, 'code': code, 'mobile': mobile, 'mytimestamp': mytimestamp]", "lang" : "groovy", "params" : {"user": "Home", "messageid": "Home-26497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V1", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 18:41:49"}}, "upsert": {"parent" : [{"user": "Home", "messageid": "Home-26497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V1", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 18:41:49"}] }}
{ "update" : { "_id" : "919359000001"} }
{ "script" : { "inline": "ctx._source.parent += ['user': user, 'messageid': messageid, 'message': message, 'code': code, 'mobile': mobile, 'mytimestamp': mytimestamp]", "lang" : "groovy", "params" : {"user": "Home", "messageid": "Home-36497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V13343", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 12:41:49"}}, "upsert": {"parent" : [{"user": "Home", "messageid": "Home-36497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V13343", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 12:41:49"}] }}
How do I code in logstash to convert my csv data to look like the above?
Update
I have python code that works as expected. I will like to know how to modify this code to suit the "output" parameters suggested as per the answer. In the following example, df_json is a python object that is nothing but python dataframe flattened to json.
import copy
with open('myfile.txt', 'w') as f:
for doc1 in df_json:
import json
doc = mydict(doc1)
docnew = copy.deepcopy(doc)
del docnew['destination']
action = '{ "update": {"_id": %s }}\n' % doc['destination']
f.write(action)
entry = '{ "script" : { "inline": "ctx._source.parent += [\'user\': user, \'messageid\': messageid, \'message\': message, \'code\': code, \'mobile\': mobile, \'mytimestamp\': mytimestamp]", "lang" : "groovy", "params" : %s}, "upsert": {"parent" : [%s ] }}\n' % (doc, docnew)
f.write(entry)
! curl -s -XPOST XXX.xx.xx.x:9200/test_index222/doc/_bulk --data-binary @myfile.txt; echo
Update 2
I tried the following configuration and it is replacing (not updating as per script) documents.
output {
elasticsearch {
action => "index"
hosts => ["172.17.0.1"]
document_id => "%{destination}"
index => "logstash3-%{+YYYY.MM.dd}"
workers => 1
script => "ctx._source.parent += ['user': user, 'messageid': messageid, 'message': message, 'code': code, 'mobile': mobile, 'mytimestamp': mytimestamp]"
script_type => "inline"
script_lang => "groovy"
scripted_upsert => "true"
}
}
When I changed the action to "update", I get the following error...
:response=>{"update"=>{"_index"=>"logstash4-2016.07.29", "_type"=>"csv", "_id"=>"919359000000",
"status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to execute script",
"caused_by"=>{"type"=>"script_exception", "reason"=>"failed to run in line script
[ctx._source.parent += ['user': user, 'messageid': messageid, 'message': message, 'code': code, 'mobile': mobile, 'mytimestamp': mytimestamp]]
using lang [groovy]", "caused_by"=>{"type"=>"missing_property_exception", "reason"=>"No such property: user for class: fe1b423dc4966b0f0b511b732474637705bf3bb1"}}}}}, :level=>:warn}
Update 3
As per Val's answer I added event and I get this error...
:response=>{"update"=>{"_index"=>"logstash4-2016.08.06", "_type"=>"csv", "_id"=>"%{destination}", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to execute script", "caused_by"=>{"type"=>"script_exception", "reason"=>"failed to run inline script [ctx._source.parent += ['user': event.user, 'messageid': event.messageid, 'message': event.message, 'code': event.code, 'mobile': event.mobile, 'mytimestamp': event.mytimestamp]] using lang [groovy]", "caused_by"=>{"type"=>"null_pointer_exception", "reason"=>"Cannot execute null+{user=null, messageid=null, message=, code=null, mobile=null, mytimestamp=null}"}}}}}
Update 4
As per Val's updated answer I tried this...
script => "ctx._source.parent = (ctx._source.parent ?: []) + ['user': event.user, 'messageid': event.messageid, 'message': event.message, 'code': event.code, 'mobile': event.mobile, 'mytimestamp': event.mytimestamp]"
And got this error:
{:timestamp=>"2016-08-12T09:40:48.869000+0000", :message=>"Pipeline main started"}
{:timestamp=>"2016-08-12T09:40:49.517000+0000", :message=>"Error parsing csv", :field=>"message", :source=>"", :exception=>#<NoMethodError: undefined method `each_index' for nil:NilClass>, :level=>:warn}
Only 2 records were added to the database.
Wherever scripting is supported in the Elasticsearch APIs, the syntax follows the same pattern; you specify the language of your script, provide the script logic (or source), and add parameters that are passed into the script: "script": { "lang": "...", "source" | "id": "...", "params": { ... } }
Painless is a simple, secure scripting language designed specifically for use with Elasticsearch. It is the default scripting language for Elasticsearch and can safely be used for inline and stored scripts.
Elasticsearch is developed in Java and is dual-licensed under the source-available Server Side Public License and the Elastic license, while other parts fall under the proprietary (source-available) Elastic License.
There are two main types of scripting languages: server-side and client-side.
elasticsearch
output plugin supports script parameters:
output {
elasticsearch {
action => "update"
hosts => ["172.17.0.1"]
index => "logstash-%{+YYYY.MM.dd}"
workers => 1
script => "<your script here>"
script_type => "inline"
# Set the language of the used script
# script_lang =>
# if enabled, script is in charge of creating non-existent document (scripted update)
# scripted_upsert => (default is false)
}
}
The event is passed to the script in your output using the event
variable name (by default, but you can change it using the script_var_name
setting).
So the script in your output needs to account for it.
script => "ctx._source.parent = (ctx._source.parent ?: []) + ['user': event.user, 'messageid': event.messageid, 'message': event.message, 'code': event.code, 'mobile': event.mobile, 'mytimestamp': event.mytimestamp]"
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With