Can anyone share a working example of Flink Kafka (mainly receiving messages from Kafka) in Scala? I know there is a KafkaWordCount example in Spark. I just need to print out Kafka message in Flink. It would be really helpful.
Apache Flink is a stream processing framework that can be used easily with Java. Apache Kafka is a distributed stream processing system supporting high fault-tolerance.
If you're not familiar with Apache Kafka internal code, you might wonder what Scala and Kafka have to do with each other. The answer is quite simple: Kafka is written in Java and Scala.
Apache Kafka Connector. Flink provides an Apache Kafka connector for reading data from and writing data to Kafka topics with exactly-once guarantees.
Latency – No doubt Flink is much faster due to it's architecture and cluster deployment mechanism, Flink throughput in the order of tens of millions of events per second in moderate clusters, sub-second latency that can be as low as few 10s of milliseconds.
The following code shows how to read from a Kafka topic using Flink's Scala DataStream API:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object Main {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
val stream = env
.addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties))
.print
env.execute("Flink Kafka Example")
}
}
In contrast to what Robert added, below is a piece of application code for sending messages to the Kafka topic.
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
object KafkaProducer {
def main(args: Array[String]): Unit = {
KafkaProducer.sendMessageToKafkaTopic("localhost:9092", "topic_name")
}
def sendMessageToKafkaTopic(server: String, topic:String): Unit = {
val props = new Properties()
props.put("bootstrap.servers", servers)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String,String](props)
val record = new ProducerRecord[String,String](topic, "HELLO WORLD!")
producer.send(record)
producer.close()
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With