Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scripting in logstash

Is it possible to do python like scripting in logstash? I can import the csv data into elasticsearch using logstash. But I need to use update API instead of simply indexing all rows.

Here is my sample csv file...

vi /tmp/head.txt
"Home","Home-66497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/1VSZj","919359000000","HMSHOP","916265100000","2016-05-18 08:41:49"
"Home","Home-26497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/1V1","919359000001","HMSHOP","916265100000","2016-05-18 18:41:49"
"Home","Home-36497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/SZj1","919359000001","HMSHOP","916265100000","2016-05-18 12:41:49"
"Home","Home-46497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/1","919359000000","HMSHOP","916265100000","2016-05-18 14:41:49"
"Home","Home-56497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/1VSZj1xc","919359000000","HMSHOP","916265100000","2016-05-18 16:41:49"

Here is logstash config file...

vi logstash.conf
input {
    file {
        path => "/tmp/head.txt"
        type => "csv"
        start_position => beginning
    }
}
filter {
    csv {
        columns => ["user", "messageid", "message", "destination", "code", "mobile", "mytimestamp"]
        separator => ","
    }
}

output {
    elasticsearch {
        action => "index"
        hosts => ["172.17.0.1"]
        index => "logstash-%{+YYYY.MM.dd}"
        workers => 1
    }
}

I have confirmed that the above configuration is working as expected and all 5 records are stored as 5 separate documents.

here is my docker command...

docker run -d -v "/tmp/logstash.conf":/usr/local/logstash/config/logstash.conf -v /tmp/:/tmp/ logstash -f /usr/local/logstash/config/logstash.conf

The problem is that I need to merge the documents based on destination number. The destination should be the ID of the document. There are some rows with the same destination. For e.g. _id: 919359000001 This document should have both the following records as nested objects.

"user": "Home", "messageid": "Home-26497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V1", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 18:41:49"
"user": "Home", "messageid" "Home-36497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/SZj1", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp": "2016-05-18 12:41:49"

Elasticsearch is correctly converting the csv data to json as shown above. What I need is to reformat the statement to take advantage of scripting using update API The following code is working correctly.

POST /test_index/doc/_bulk
{ "update" : { "_id" : "919359000001"} }
{ "script" : { "inline": "ctx._source.parent += ['user': user, 'messageid': messageid, 'message': message, 'code': code, 'mobile': mobile, 'mytimestamp': mytimestamp]", "lang" : "groovy", "params" : {"user": "Home", "messageid": "Home-26497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V1", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 18:41:49"}}, "upsert": {"parent" : [{"user": "Home", "messageid": "Home-26497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V1", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 18:41:49"}] }}
{ "update" : { "_id" : "919359000001"} }
{ "script" : { "inline": "ctx._source.parent += ['user': user, 'messageid': messageid, 'message': message, 'code': code, 'mobile': mobile, 'mytimestamp': mytimestamp]", "lang" : "groovy", "params" : {"user": "Home", "messageid": "Home-36497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V13343", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 12:41:49"}}, "upsert": {"parent" : [{"user": "Home", "messageid": "Home-36497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V13343", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 12:41:49"}] }}

How do I code in logstash to convert my csv data to look like the above?


Update

I have python code that works as expected. I will like to know how to modify this code to suit the "output" parameters suggested as per the answer. In the following example, df_json is a python object that is nothing but python dataframe flattened to json.

import copy
with open('myfile.txt', 'w') as f:
    for doc1 in df_json:
        import json
        doc = mydict(doc1)
        docnew = copy.deepcopy(doc)
        del docnew['destination']
        action = '{ "update": {"_id": %s }}\n' % doc['destination'] 
        f.write(action)
        entry = '{ "script" : { "inline": "ctx._source.parent += [\'user\': user, \'messageid\': messageid, \'message\': message, \'code\': code, \'mobile\': mobile, \'mytimestamp\': mytimestamp]", "lang" : "groovy", "params" : %s}, "upsert": {"parent" : [%s ] }}\n' %   (doc, docnew)
        f.write(entry)

! curl -s -XPOST XXX.xx.xx.x:9200/test_index222/doc/_bulk --data-binary @myfile.txt; echo

Update 2

I tried the following configuration and it is replacing (not updating as per script) documents.

output {
    elasticsearch {
        action => "index"
        hosts => ["172.17.0.1"]
        document_id => "%{destination}"
        index => "logstash3-%{+YYYY.MM.dd}"
        workers => 1
        script => "ctx._source.parent += ['user': user, 'messageid': messageid, 'message': message, 'code': code, 'mobile': mobile, 'mytimestamp': mytimestamp]"
        script_type => "inline"
        script_lang =>  "groovy"
        scripted_upsert => "true"
    }
}

When I changed the action to "update", I get the following error...

:response=>{"update"=>{"_index"=>"logstash4-2016.07.29", "_type"=>"csv", "_id"=>"919359000000", 
"status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to execute script", 
"caused_by"=>{"type"=>"script_exception", "reason"=>"failed to run in line script 
[ctx._source.parent += ['user': user, 'messageid': messageid, 'message': message, 'code': code, 'mobile': mobile, 'mytimestamp': mytimestamp]] 
using lang [groovy]", "caused_by"=>{"type"=>"missing_property_exception", "reason"=>"No such property: user for class: fe1b423dc4966b0f0b511b732474637705bf3bb1"}}}}}, :level=>:warn}

Update 3

As per Val's answer I added event and I get this error...

:response=>{"update"=>{"_index"=>"logstash4-2016.08.06", "_type"=>"csv", "_id"=>"%{destination}", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to execute script", "caused_by"=>{"type"=>"script_exception", "reason"=>"failed to run inline script [ctx._source.parent += ['user': event.user, 'messageid': event.messageid, 'message': event.message, 'code': event.code, 'mobile': event.mobile, 'mytimestamp': event.mytimestamp]] using lang [groovy]", "caused_by"=>{"type"=>"null_pointer_exception", "reason"=>"Cannot execute null+{user=null, messageid=null, message=, code=null, mobile=null, mytimestamp=null}"}}}}}

Update 4

As per Val's updated answer I tried this...

script => "ctx._source.parent = (ctx._source.parent ?: []) + ['user': event.user, 'messageid': event.messageid, 'message': event.message, 'code': event.code, 'mobile': event.mobile, 'mytimestamp': event.mytimestamp]"

And got this error:

{:timestamp=>"2016-08-12T09:40:48.869000+0000", :message=>"Pipeline main started"}
{:timestamp=>"2016-08-12T09:40:49.517000+0000", :message=>"Error parsing csv", :field=>"message", :source=>"", :exception=>#<NoMethodError: undefined method `each_index' for nil:NilClass>, :level=>:warn}

Only 2 records were added to the database.

like image 357
shantanuo Avatar asked Jul 28 '16 09:07

shantanuo


People also ask

How do I create a script in Elasticsearch?

Wherever scripting is supported in the Elasticsearch APIs, the syntax follows the same pattern; you specify the language of your script, provide the script logic (or source), and add parameters that are passed into the script: "script": { "lang": "...", "source" | "id": "...", "params": { ... } }

What is painless scripting?

Painless is a simple, secure scripting language designed specifically for use with Elasticsearch. It is the default scripting language for Elasticsearch and can safely be used for inline and stored scripts.

Which programming language is used in Elasticsearch?

Elasticsearch is developed in Java and is dual-licensed under the source-available Server Side Public License and the Elastic license, while other parts fall under the proprietary (source-available) Elastic License.

How many types of scripting languages are there?

There are two main types of scripting languages: server-side and client-side.


2 Answers

elasticsearch output plugin supports script parameters:

output {
    elasticsearch {
        action => "update"
        hosts => ["172.17.0.1"]
        index => "logstash-%{+YYYY.MM.dd}"
        workers => 1
        script => "<your script here>"
        script_type => "inline"
        # Set the language of the used script
        # script_lang => 
        # if enabled, script is in charge of creating non-existent document (scripted update)
        # scripted_upsert => (default is false)
    }
}
like image 154
alpert Avatar answered Oct 04 '22 04:10

alpert


The event is passed to the script in your output using the event variable name (by default, but you can change it using the script_var_name setting).

So the script in your output needs to account for it.

    script => "ctx._source.parent = (ctx._source.parent ?: []) + ['user': event.user, 'messageid': event.messageid, 'message': event.message, 'code': event.code, 'mobile': event.mobile, 'mytimestamp': event.mytimestamp]"
like image 44
Val Avatar answered Oct 04 '22 05:10

Val