Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can anyone share a Flink Kafka example in Scala?

Can anyone share a working example of Flink Kafka (mainly receiving messages from Kafka) in Scala? I know there is a KafkaWordCount example in Spark. I just need to print out Kafka message in Flink. It would be really helpful.

like image 217
wdz Avatar asked Jul 16 '15 06:07

wdz


People also ask

What is the difference between Kafka and Flink?

Apache Flink is a stream processing framework that can be used easily with Java. Apache Kafka is a distributed stream processing system supporting high fault-tolerance.

Is Scala needed for Kafka?

If you're not familiar with Apache Kafka internal code, you might wonder what Scala and Kafka have to do with each other. The answer is quite simple: Kafka is written in Java and Scala.

What is Kafka Flink?

Apache Kafka Connector. Flink provides an Apache Kafka connector for reading data from and writing data to Kafka topics with exactly-once guarantees.

Is Flink faster than Kafka?

Latency – No doubt Flink is much faster due to it's architecture and cluster deployment mechanism, Flink throughput in the order of tens of millions of events per second in moderate clusters, sub-second latency that can be as low as few 10s of milliseconds.


2 Answers

The following code shows how to read from a Kafka topic using Flink's Scala DataStream API:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082
import org.apache.flink.streaming.util.serialization.SimpleStringSchema


object Main {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "test")
    val stream = env
      .addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties))
      .print

    env.execute("Flink Kafka Example")
  }
}
like image 51
Robert Metzger Avatar answered Sep 22 '22 12:09

Robert Metzger


In contrast to what Robert added, below is a piece of application code for sending messages to the Kafka topic.

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

object KafkaProducer {

  def main(args: Array[String]): Unit = {
    KafkaProducer.sendMessageToKafkaTopic("localhost:9092", "topic_name")
  }    

  def sendMessageToKafkaTopic(server: String, topic:String): Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", servers)
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String,String](props)
    val record = new ProducerRecord[String,String](topic, "HELLO WORLD!")
    producer.send(record)
    producer.close()
  }
}
like image 33
Oskarro Avatar answered Sep 19 '22 12:09

Oskarro