Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Logstash with multiple kafka inputs

I am trying to filter kafka events from multiple topics, but once all events from one topic has been filtered logstash is not able to fetch events from the other kafka topic. I am using topics with 3 partitions and 2 replications Here is my logstash config file

input {
    kafka{              
        auto_offset_reset => "smallest"
        consumer_id => "logstashConsumer1"          
        topic_id => "unprocessed_log1"
        zk_connect=>"192.42.79.67:2181,192.41.85.48:2181,192.10.13.14:2181"
        type => "kafka_type_1"
}
kafka{              
    auto_offset_reset => "smallest"
    consumer_id => "logstashConsumer1"          
    topic_id => "unprocessed_log2"
    zk_connect => "192.42.79.67:2181,192.41.85.48:2181,192.10.13.14:2181"
    type => "kafka_type_2"
}
}
filter{
    if [type] == "kafka_type_1"{
    csv { 
        separator=>" "
        source => "data"        
    }   
}
if [type] == "kafka_type_2"{    
    csv { 
        separator => " "        
        source => "data"
    }
}
}
output{
    stdout{ codec=>rubydebug{metadata => true }}
}
like image 215
Abhijeet Avatar asked Aug 09 '16 21:08

Abhijeet


People also ask

Can Logstash have multiple inputs?

Only use input once.

Can Logstash have multiple outputs?

Logstash multiple outputs refer to the process where the ingested data by the processing pipeline is transformed and further transferred to more than one output by the open-source pipeline of Logstash on the server-side.

What is the difference between Kafka and Logstash?

Kafka has a broader approval, being mentioned in 509 company stacks & 470 developers stacks; compared to Logstash, which is listed in 563 company stacks and 278 developer stacks.


2 Answers

Its a very late reply but if you wanted to take input multiple topic and output to another kafka multiple output, you can do something like this :

input {
  kafka {
    topics => ["topic1", "topic2"]
    codec => "json"
    bootstrap_servers => "kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092"
    decorate_events => true
    group_id => "logstash-multi-topic-consumers"
    consumer_threads => 5
  }
}
    
output {
   if [kafka][topic] == "topic1" {
     kafka {
       codec => "json"
       topic_id => "new_topic1"
       bootstrap_servers => "output-kafka-1:9092"
     }
   }
   else if [kafka][topic] == "topic2" {
      kafka {
       codec => "json"
       topic_id => "new_topic2"
       bootstrap_servers => "output-kafka-1:9092"
      }
    }
}

Be careful while detailing your bootstrap servers, give name on which your kafka has advertised listeners.

Ref-1: https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-group_id

Ref-2: https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-decorate_events

like image 142
daemonsl Avatar answered Sep 26 '22 23:09

daemonsl


The previous answer didn't work for me and it seems it doses not recognize conditional statements in output, Here is my answer which correct and valid at least for my case where I have defined tags in input for both Kafka consumers and documents (in my case they are logs) are ingested into separate indexes related to their consumer topics .

input {
  kafka {
        group_id => "35834"
        topics => ["First-Topic"]
        bootstrap_servers => "localhost:9092"
        codec => json
        tags => ["First-Topic"]
  }
    kafka {
        group_id => "35834"
        topics => ["Second-Topic"]
        bootstrap_servers => "localhost:9092"
        codec => json
        tags => ["Second-Topic"]
    }
}

filter {

}
output {
    if "Second-Topic" in [tags]{
     elasticsearch {
         hosts => ["localhost:9200"]
         document_type => "_doc"
         index => "logger"
     }
      stdout { codec => rubydebug
           }
   }
   else if "First-Topic" in [tags]{
    elasticsearch {
          hosts => ["localhost:9200"]
          document_type => "_doc"
          index => "saga"
      }
      stdout { codec => rubydebug
      }
    }
}
like image 36
Lunatic Avatar answered Sep 26 '22 23:09

Lunatic