I am new to Spark an apology for asking such a question. I have a use case where I want to read a data from a specific partition of a topic with the help of Spark Streaming. I am using Spark Java API to do all the stuff.
I have created a topic named test with replication factor 2 and 5 partitions. Hopefully with the help of spark streaming Kafka integration guide, I am able to do all the things like creating a JavaStreamingContext object, creating a direct stream to Kafka broker and able to read all the message from all partition.
But still my use case does not fulfill, I have to read only messages of a particular partition of a topic in Kafka broker instead of all messages from all partition.
You should be able to read specific partition from specific offset using following code.
Map<TopicAndPartition, Long> consumerOffsets = new HashMap<TopicAndPartition, Long>();
TopicAndPartition p1 = new TopicAndPartition("yourtopic","yourpartition");
consumerOffsets.put(p1,offset);
JavaInputDStream<String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
String.class,
kafkaParams,
consumerOffsetsLong,
new Function<MessageAndMetadata<String, String>, String>() {
public String call(MessageAndMetadata<String, String> msgAndMeta) throws Exception {
return msgAndMeta.message();
}
}
);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With