Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to read records from Kafka topic from beginning in Spark Streaming?

I am trying to read records from a Kafka topic using Spark Streaming.

This is my code:

object KafkaConsumer {

  import ApplicationContext._

  def main(args: Array[String]) = {

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> s"${UUID.randomUUID().toString}",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("pressure")
    val stream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    stream.print()
    stream.map(record => (record.key, record.value)).count().print()
    streamingContext.start()
  }
}

It displays nothing when I run this.

To check if data is actually present in the pressure topic, I used the command line approach and it does display records:

bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic pressure \
  --from-beginning

Output:

TimeStamp:07/13/16 15:20:45:226769,{'Pressure':'834'}
TimeStamp:07/13/16 15:20:45:266287,{'Pressure':'855'}
TimeStamp:07/13/16 15:20:45:305694,{'Pressure':'837'}

What's wrong?

like image 548
Shivansh Avatar asked Nov 27 '16 05:11

Shivansh


People also ask

How to read from Kafka for streaming queries in spark?

It reads all events in all partitions. Option subscribePattern is used to match against topic names. To read from Kafka for streaming queries, we can use function SparkSession.readStream. Kafka server addresses and topic names are required.

What are the benefits of Apache Spark and Apache Kafka integration?

There are several benefits of implementing Spark Kafka Integration: By setting up the the Spark Streaming and Kafka Integration, you can ensure minimum data loss through Spark Streaming while saving all the received Kafka data synchronously for an easy recovery. Users can read messages from a single topic or multiple Kafka topics.

How to write to a Kafka topic from a Dataframe?

For streaming sourced data frame, we can directly use DataFrame.writeStream function to write into a Kafka topic. The above code snippet read stream data from topic 'kontext-kafka' and then writes into another topic named 'kontext-kafka-3'.

What is startingoffsets earliest in Spark Streaming?

Spark Streaming uses readStream () on SparkSession to load a streaming Dataset from Kafka. Option startingOffsets earliest is used to read all data available in the Kafka at the start of the query, we may not use this option that often and the default value for startingOffsets is latest which reads only new data that’s not been processed.


1 Answers

You're missing streamingContext.awaitTermination().

like image 188
Yuval Itzchakov Avatar answered Oct 20 '22 17:10

Yuval Itzchakov