I wrote a bash script that finds CSV files in specified folders and pipes them into logstash with the correct config file. However when running this script I run into the following error, saying that the shutdown process is stalled, causing an infinite loop until I manually stop it with ctrl+c:
[2018-03-22T08:59:53,833][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.2.3"}
[2018-03-22T08:59:54,211][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2018-03-22T08:59:57,970][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-03-22T08:59:58,116][INFO ][logstash.pipeline ] Pipeline started succesfully {:pipeline_id=>"main", :thread=>"#<Thread:0xf6851b3 run>"}
[2018-03-22T08:59:58,246][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]}
[2018-03-22T08:59:58,976][INFO ][logstash.outputs.file ] Opening file {:path=>"/home/kevin/otrs_customer_user"}
[2018-03-22T09:00:06,471][WARN ][logstash.shutdownwatcher ] {"inflight_count"=>0, "stalling_thread_info"=>{["LogStash::Filters::CSV", {"separator"=>";", "columns"=>["IOT", "OID", "SUM", "XID", "change_by", "change_time", "city", "company", "company2", "create_by", "create_time", "customer_id", "email", "fax", "first_name", "id", "inst_city", "inst_first_name", "inst_last_name", "inst_street", "inst_zip", "last_name", "login", "mobile", "phone", "phone2", "street", "title", "valid_id", "varioCustomerId", "zip"], "id"=>"f1c74146d6672ca71f489aac1b4c2a332ae515996657981e1ef44b441a7420c8"}]=>[{"thread_id"=>23, "name"=>nil, "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:90:in `read_batch'"}]}}
[2018-03-22T09:00:06,484][ERROR][logstash.shutdownwatcher ] The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.
[2018-03-22T09:00:11,438][WARN ][logstash.shutdownwatcher ] {"inflight_count"=>0, "stalling_thread_info"=>{["LogStash::Filters::CSV", {"separator"=>";", "columns"=>["IOT", "OID", "SUM", "XID", "change_by", "change_time", "city", "company", "company2", "create_by", "create_time", "customer_id", "email", "fax", "first_name", "id", "inst_city", "inst_first_name", "inst_last_name", "inst_street", "inst_zip", "last_name", "login", "mobile", "phone", "phone2", "street", "title", "valid_id", "varioCustomerId", "zip"], "id"=>"f1c74146d6672ca71f489aac1b4c2a332ae515996657981e1ef44b441a7420c8"}]=>[{"thread_id"=>23, "name"=>nil, "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:90:in `read_batch'"}]}}
When I run the same file and the same config manually with bash logstash -f xyz.config < myfile.config
it works as desired and the process gets properly terminated. In the bash script I'm basically using the exact command and I run into the error above.
I also noticed that the problem appears to be random and not every time on the same file and config.
My config consists of a stdin input a csv filter and for testing an output in json format to a file (also removed stdout{}
).
Does anybody have an idea why my process stalls during script execution? Or if not, is there maybe a way to tell logstash to shutdown when it's stalled?
Sample config:
input {
stdin {
id => "${LS_FILE}"
}
}
filter {
mutate {
add_field => { "foo_type" => "${FOO_TYPE}" }
add_field => { "[@metadata][LS_FILE]" => "${LS_FILE}"}
}
if [@metadata][LS_FILE] == "contacts.csv" {
csv {
separator => ";"
columns =>
[
"IOT",
"OID",
"SUM",
"XID",
"kundenid"
]
}
if [kundenid]{
mutate {
update => { "kundenid" => "n-%{kundenid}" }
}
}
}
}
output {
if [@metadata][LS_FILE] == "contacts.csv" {
file{
path => "~/contacts_file"
codec => json_lines
}
}
}
Sample script:
LOGSTASH="/customer/app/logstash-6.2.3/bin/logstash"
for file in $(find $TARGETPATH -name *.csv) # Loop each file in given path
do
if [[ $file = *"foo"* ]]; then
echo "Importing $file"
export LS_FILE=$(basename $file)
bash $LOGSTASH -f $CFG_FILE < $file # Starting logstash
echo "file $file imported."
fi
done
I export environment variables in the bash script and set them to metadata in the logstash configs to perform some conditinals for differet input files. The output to JSON in a file is just for testing purposes.
Logstash tries to performs various steps when you try to shutdown such as,
and there are various factors which makes the shutdown process very unpredictable such as,
From Logstash documentation,
Logstash has a stall detection mechanism that analyzes the behavior of the pipeline and plugins during shutdown. This mechanism produces periodic information about the count of inflight events in internal queues and a list of busy worker threads.
You can use --pipeline.unsafe_shutdown
flag while starting logstash to force terminate the process in case of stalled shutdown. When --pipeline.unsafe_shutdown
isn’t enabled, Logstash continues to run and produce these reports periodically, this is why the problem appears to be random in your case.
Remember that Unsafe shutdowns, force-kills of the Logstash process, or crashes of the Logstash process for any other reason may result in data loss (unless you’ve enabled Logstash to use persistent queues).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With