I am trying to read records from a Kafka topic using Spark Streaming.
This is my code:
object KafkaConsumer {
import ApplicationContext._
def main(args: Array[String]) = {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> s"${UUID.randomUUID().toString}",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("pressure")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.print()
stream.map(record => (record.key, record.value)).count().print()
streamingContext.start()
}
}
It displays nothing when I run this.
To check if data is actually present in the pressure
topic, I used the command line approach and it does display records:
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic pressure \
--from-beginning
Output:
TimeStamp:07/13/16 15:20:45:226769,{'Pressure':'834'}
TimeStamp:07/13/16 15:20:45:266287,{'Pressure':'855'}
TimeStamp:07/13/16 15:20:45:305694,{'Pressure':'837'}
What's wrong?
It reads all events in all partitions. Option subscribePattern is used to match against topic names. To read from Kafka for streaming queries, we can use function SparkSession.readStream. Kafka server addresses and topic names are required.
There are several benefits of implementing Spark Kafka Integration: By setting up the the Spark Streaming and Kafka Integration, you can ensure minimum data loss through Spark Streaming while saving all the received Kafka data synchronously for an easy recovery. Users can read messages from a single topic or multiple Kafka topics.
For streaming sourced data frame, we can directly use DataFrame.writeStream function to write into a Kafka topic. The above code snippet read stream data from topic 'kontext-kafka' and then writes into another topic named 'kontext-kafka-3'.
Spark Streaming uses readStream () on SparkSession to load a streaming Dataset from Kafka. Option startingOffsets earliest is used to read all data available in the Kafka at the start of the query, we may not use this option that often and the default value for startingOffsets is latest which reads only new data that’s not been processed.
You're missing streamingContext.awaitTermination()
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With