I am trying to read older messages from Kafka with spark streaming. However, I am only able to retrieve messages as they are sent in real time (i.e., if I populate new messages, while my spark program is running - then I get those messages).
I am changing my groupID and consumerID to make sure zookeeper isn't just not giving messages it knows my program has seen before.
Assuming spark is seeing the offset in zookeeper as -1, shouldn't it read all the old messages in the queue? Am I just misunderstanding the way a kafka queue can be used? I'm very new to spark and kafka, so I can't rule out that I'm just misunderstanding something.
package com.kibblesandbits
import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import net.liftweb.json._
object KafkaStreamingTest {
val cfg = new ConfigLoader().load
val zookeeperHost = cfg.zookeeper.host
val zookeeperPort = cfg.zookeeper.port
val zookeeper_kafka_chroot = cfg.zookeeper.kafka_chroot
implicit val formats = DefaultFormats
def parser(json: String): String = {
return json
}
def main(args : Array[String]) {
val zkQuorum = "test-spark02:9092"
val group = "myGroup99"
val topic = Map("testtopic" -> 1)
val sparkContext = new SparkContext("local[3]", "KafkaConsumer1_New")
val ssc = new StreamingContext(sparkContext, Seconds(3))
val json_stream = KafkaUtils.createStream(ssc, zkQuorum, group, topic)
var gp = json_stream.map(_._2).map(parser)
gp.saveAsTextFiles("/tmp/sparkstreaming/mytest", "json")
ssc.start()
}
When running this, I will see the following message. So I am confident that it's not just not seeing the messages because the offset is set.
14/12/05 13:34:08 INFO ConsumerFetcherManager: [ConsumerFetcherManager-1417808045047] Added fetcher for partitions ArrayBuffer([[testtopic,0], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092] , [[testtopic,1], initOffset -1 to broker i d:1,host:test-spark02.vpc,port:9092] , [[testtopic,2], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092] , [[testtopic,3], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092] , [[testtopic,4], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092] )
Then, if I populate 1000 new messages -- I can see those 1000 messages saved in my temp directory. But I don't know how to read the existing messages, which should number in the (at this point) tens of thousands.
Kafka analyses the events as they unfold. As a result, it employs a continuous (event-at-a-time) processing model. Spark, on the other hand, uses a micro-batch processing approach, which divides incoming streams into small batches for processing.
Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.
Kafka act as the central hub for real-time streams of data and are processed using complex algorithms in Spark Streaming. Once the data is processed, Spark Streaming could be publishing results into yet another Kafka topic or store in HDFS, databases or dashboards.
Use the alternative factory method on KafkaUtils
that lets you provide a configuration to the Kafka consumer:
def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
): ReceiverInputDStream[(K, V)]
Then build a map with your kafka configuration and add the parameter 'kafka.auto.offset.reset' set to 'smallest':
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
"zookeeper.connection.timeout.ms" -> "10000",
"kafka.auto.offset.reset" -> "smallest"
)
Provide that config to the factory method above. "kafka.auto.offset.reset" -> "smallest" tells the consumer to starts from the smallest offset in your topic.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With