Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I implement Kafka Consumer in Scala

I'm trying to implement a kafka consumer in scala. I've seen a million tutorials for how to do it in Java, and even some (like this one) that say it's for scala but it's written in Java.

Does anyone know where I can find an example of how to write it in Scala? I've only just started to learn Scala so maybe the linked example can be used in Scala even though it's written in Java or something, but I honestly have no idea what I'm doing at the moment. Everything I google just links me to how to do it in Java.

like image 530
annedroiid Avatar asked Aug 04 '16 02:08

annedroiid


People also ask

Can we write Kafka code in Scala?

The answer is quite simple: Kafka is written in Java and Scala.

Where can I run Kafka consumer?

Run Kafka Consumer Console Kafka provides the utility kafka-console-consumer.sh which is located at ~/kafka-training/kafka/bin/kafka-console-producer.sh to receive messages from a topic on the command line. Create the file in ~/kafka-training/lab1/start-consumer-console.sh and run it.


2 Answers

The reason you're seeing most of the examples in Java is that the new KafkaProducer starting 0.8.2.2 is written in Java.

Assuming you're using sbt as your build system, and assuming your working with Kafka 0.8.2.2 (you can change the version as needed), you'll need:

libraryDependencies ++= {
  Seq(
    "org.apache.kafka" %% "kafka" % "0.8.2.2",
    "org.apache.kafka" % "kafka-clients" % "0.8.2.2",
  )
}

A simple example should get you started:

import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer 

object KafkaExample {
  def main(args: Array[String]): Unit = {
    val properties = new Properties()
    properties.put("bootstrap.servers", "localhost:9092")
    properties.put("group.id", "consumer-tutorial")
    properties.put("key.deserializer", classOf[StringDeserializer])
    properties.put("value.deserializer", classOf[StringDeserializer])

    val kafkaConsumer = new KafkaConsumer[String, String](properties)
    kafkaConsumer.subscribe("firstTopic", "secondTopic")

    while (true) {
      val results = kafkaConsumer.poll(2000).asScala
      for ((topic, data) <- results) {
        // Do stuff
      }
    }
}
like image 175
Yuval Itzchakov Avatar answered Sep 19 '22 19:09

Yuval Itzchakov


You can also look to a working template totally build on Scala here: https://github.com/knoldus/activator-kafka-scala-producer-consumer This application contains the code that you want to use in here.

I hope I solved your problem thanks !

like image 35
Shivansh Avatar answered Sep 19 '22 19:09

Shivansh