I have a Kafka topic I want to read from the earliest event.
What I want to do is to get all the data from a topic (from the absolute earliest event in time) until the events at a certain date.
The structure of each event has a field called dateCliente
which I use as a threshold to filter events. So far, I've managed to complete the read and write. I'm writing to an temporary parquet file which I use as a partition for a Hive table. This is working OK, however, even though I have specified earliest in the auto.offset.reset
parameter, it's not reading data from the start.
Whenever I run my code I get all the events starting from this date. And every time I execute the code again, it keeps reading from the Kafka event following the last one I read in the previous code execution.
The code I'm using to configure the Kafka Consumer and subscribe to the topics is the following:
// Configurations for kafka consumer
val conf = ConfigFactory.parseResources("properties.conf")
val brokersip = conf.getString("enrichment.brokers.value")
val topics_in = conf.getString("enrichment.topics_in.value")
//
// Crea la sesion de Spark
val spark = SparkSession
.builder()
.master("yarn")
.appName("XY")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val properties = new Properties
properties.put("key.deserializer", classOf[StringDeserializer])
properties.put("value.deserializer", classOf[StringDeserializer])
properties.put("bootstrap.servers", brokersip)
properties.put("auto.offset.reset", "earliest")
properties.put("group.id", "XY")
val consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe( util.Collections.singletonList("geoevents") )
However, whenever I create a consumer from the command line in order to read the data from the topic, I do get all the events from the previous days. The command line command I run is:
kafka-console-consumer --new-consumer --topic geoevents --from-beginning --bootstrap-server xx.yy.zz.xx
Any ideas of why is my code behaving like that and ignoring my "earliest"
in auto.offset.reset
?
It's because auto.offset.reset
is only applied if there are no committed offsets for the group.
See the consumer configs documentation:
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server
If you want to restart from the beginning, you can either:
use a new group name (for example append System.currentTimeMillis()
to the group anme)
explicitly move the position of the consumer to the start of the partition using seekToBeginning()
: http://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seekToBeginning-java.util.Collection-
The property auto.offset.reset
is used only when there is no offset for the given consumer stored in Kafka. When you commit the record Kafka stores the offset of the record in a special topic and in the next run, your consumer will read the topic from the last committed offset. To read from the beginning you should call consumer.seekToBeginning
or use unique group.id
property.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With