Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to connect Kafka with Elasticsearch?

I am new in Kafka, I use kafka to collect netflow through logstash(it is ok), and I want to send the data to elasticsearch from kafka, but there are some problems.
My question is how can I connect Kafka with Elasticsearch? netflow to kafka logstash config:

input{
    udp{
        host => "120.127.XXX.XX"
        port => 5556
        codec => netflow
    }
}
    filter{

    }
output {
  kafka {
    bootstrap_servers => "localhost:9092"    
    topic_id => "test"    
  }
  stdout{codec=> rubydebug}
}

kafka to elasticsearch logstash:

input {
      kafka { }
    }
    output {
        elasticsearch {
            hosts => ["120.127.XXX.XX:9200"]
        }
        stdout{codec=> rubydebug}
    }

log:

D:\ELK\logstash-6.1.1\bin>logstash -f kafkatoES.conf --path.data D:\ELK\logstash-6.1.1\datatest
Sending Logstash's logs to D:/ELK/logstash-6.1.1/logs which is now configured via log4j2.properties
[2018-02-01T18:52:59,713][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"D:/ELK/logstash-6.1.1/modules/fb_apache/configuration"}
[2018-02-01T18:52:59,728][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"D:/ELK/logstash-6.1.1/modules/netflow/configuration"}
[2018-02-01T18:53:00,072][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-02-01T18:53:01,070][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.1.1"}
[2018-02-01T18:53:01,804][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9601}
[2018-02-01T18:53:09,024][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://120.127.XX.XX:9200/]}}
[2018-02-01T18:53:09,040][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://120.127.XX.XX:9200/, :path=>"/"}
[2018-02-01T18:53:09,305][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://120.127.XX.XX:9200/"}
[2018-02-01T18:53:09,383][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>nil}
[2018-02-01T18:53:09,383][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the type event field won't be used to determine the document _type {:es_version=>6}
[2018-02-01T18:53:09,415][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2018-02-01T18:53:09,430][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"default"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2018-02-01T18:53:09,493][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//120.127.XXX.XX:9200"]}
[2018-02-01T18:53:09,524][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>16, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>2000, :thread=>"#<Thread:0x45e62903 run>"}
[2018-02-01T18:53:09,609][INFO ][logstash.pipeline ] Pipeline started {"pipeline.id"=>"main"}
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/D:/ELK/logstash-6.1.1/logstash-core/lib/org/apache/logging/log4j/log4j-slf4j-impl/2.6.2/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/D:/ELK/logstash-6.1.1/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.2/vendor/jar-dependencies/runtime-jars/log4j-slf4j-impl-2.8.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
[2018-02-01T18:53:09,789][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]}
[2018-02-01T18:53:09,852][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = logstash-0
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

[2018-02-01T18:53:09,945][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 0.11.0.0
[2018-02-01T18:53:09,945][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : cb8625948210849f
[2018-02-01T18:53:10,149][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] Discovered coordinator winoc-netflow:9092 (id: 2147483647 rack: null) for group logstash.
[2018-02-01T18:53:10,164][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Revoking previously assigned partitions [] for group logstash
[2018-02-01T18:53:10,164][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (Re-)joining group logstash
[2018-02-01T18:53:10,180][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] Successfully joined group logstash with generation 6
[2018-02-01T18:53:10,180][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Setting newly assigned partitions [logstash-0] for group logstash

thank you in advance!

like image 792
張皓翔 Avatar asked Feb 01 '18 11:02

張皓翔


Video Answer


2 Answers

I would suggest using Kafka Connect and its Elasticsearch sink. I actually presented on exactly this subject last night :) Here are the slides.

You can see a detailed example here.

Update May 2020: See also this tutorial video.

like image 68
Robin Moffatt Avatar answered Oct 30 '22 09:10

Robin Moffatt


The empty input block for Kafka will not work, since it doesn't know where to get the data from. The minimal config for your scenario will be something like this:

input {
    kafka {
            bootstrap_servers => "localhost:9092"
            topics => "test"
    }
}
like image 35
xeraa Avatar answered Oct 30 '22 07:10

xeraa