I am using Spark to consume data from Kafka and save it in Cassandra. My program is written in Java. I am using the spark-streaming-kafka_2.10:1.6.2
lib to accomplish this. My code is:
SparkConf sparkConf = new SparkConf().setAppName("name");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
Map<String,String> kafkaParams = new HashMap<>();
kafkaParams.put("zookeeper.connect", "127.0.0.1");
kafkaParams.put("group.id", App.GROUP);
JavaPairReceiverInputDStream<String, EventLog> messages =
KafkaUtils.createStream(jssc, String.class, EventLog.class, StringDecoder.class, EventLogDecoder.class,
kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2());
JavaDStream<EventLog> lines = messages.map(new Function<Tuple2<String, EventLog>, EventLog>() {
@Override
public EventLog call(Tuple2<String, EventLog> tuple2) {
return tuple2._2();
}
});
lines.foreachRDD(rdd -> {
javaFunctions(rdd).writerBuilder("test", "event_log", mapToRow(EventLog.class)).saveToCassandra();
});
jssc.start();
In my Cassandra table event_log
, there is a column named offsetid
to store the offset ID of the stream. How do I get the offset id till where this stream has read the Kafka stream and store it in Cassandra?
After saving it in Cassandra, I want to use the latest offset id to be used when Spark is started again. How do I do that?
If your Kafka topic is in Confluent Cloud, use the kafka-console-consumer command with the --partition and --offset flags to read from a specific partition and offset. You can also read messages from a specified partition and offset using the Confluent Cloud Console: Run it.
Answer: The consumer offset in general are stored in the internal topic __consumer_offsets . Consumer offset behaves like a bookmark for the consumer to start reading the messages from the point it left off.
Manual commits: You can call a commitSync() or commitAsync() method anytime on the KafkaConsumer . When you issue the call, the consumer will take the offset of the last message received during a poll() and commit that to the Kafka server.
Below is the code for reference you may need to change the things as per your requirement. What I have done with the code and approach is that maintain Kafka partition wise offset for each topic in Cassandra(This can be done in zookeeper also as a suggestion using its java api). Store or update the the latest offset range for the topic with each string message received, in EventLog table. So always retrieve from table and see if present, then create direct stream from that offset, otherwise fresh direct stream.
package com.spark;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import scala.Tuple2;
public class KafkaChannelFetchOffset {
public static void main(String[] args) {
String topicName = "topicName";
SparkConf sparkConf = new SparkConf().setAppName("name");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topicName));
HashMap<TopicAndPartition, Long> kafkaTopicPartition = new HashMap<TopicAndPartition, Long>();
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("zookeeper.connect", "127.0.0.1");
kafkaParams.put("group.id", "GROUP");
kafkaParams.put("metadata.broker.list", "127.0.0.1");
List<EventLog> eventLogList = javaFunctions(jssc).cassandraTable("test", "event_log", mapRowTo(EventLog.class))
.select("topicName", "partion", "fromOffset", "untilOffset").where("topicName=?", topicName).collect();
JavaDStream<String> kafkaOutStream = null;
if (eventLogList == null || eventLogList.isEmpty()) {
kafkaOutStream = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams,
topicsSet).transform(new Function<JavaPairRDD<String, String>, JavaRDD<String>>() {
@Override
public JavaRDD<String> call(JavaPairRDD<String, String> pairRdd) throws Exception {
JavaRDD<String> rdd = pairRdd.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> arg0) throws Exception {
return arg0._2;
}
});
writeOffset(rdd, ((HasOffsetRanges) rdd.rdd()).offsetRanges());
return rdd;
}
});
} else {
for (EventLog eventLog : eventLogList) {
kafkaTopicPartition.put(new TopicAndPartition(topicName, Integer.parseInt(eventLog.getPartition())),
Long.parseLong(eventLog.getUntilOffset()));
}
kafkaOutStream = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class,
kafkaParams, kafkaTopicPartition, new Function<MessageAndMetadata<String, String>, String>() {
@Override
public String call(MessageAndMetadata<String, String> arg0) throws Exception {
return arg0.message();
}
}).transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
@Override
public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {
writeOffset(rdd, ((HasOffsetRanges) rdd.rdd()).offsetRanges());
return rdd;
}
});
}
// Use kafkaOutStream for further processing.
jssc.start();
}
private static void writeOffset(JavaRDD<String> rdd, final OffsetRange[] offsets) {
for (OffsetRange offsetRange : offsets) {
EventLog eventLog = new EventLog();
eventLog.setTopicName(String.valueOf(offsetRange.topic()));
eventLog.setPartition(String.valueOf(offsetRange.partition()));
eventLog.setFromOffset(String.valueOf(offsetRange.fromOffset()));
eventLog.setUntilOffset(String.valueOf(offsetRange.untilOffset()));
javaFunctions(rdd).writerBuilder("test", "event_log", null).saveToCassandra();
}
}
}
Hope this helps and resolve your problem...
So, you want to manage kafka offsets on your own.
For this:
use createDirectStream instead of createStream. That will allow you to specify from what offsets you would like to read (fromOffsets: Map[TopicAndPartition, Long]
)
collect information about offsets you already processed. That can be done by saving offset for each message or you can have this information aggregated in separate table. To get offsets range from rdd: rdd.asInstanceOf[HasOffsetRanges].offsetRanges
. For java (according to documentation) http://spark.apache.org/docs/latest/streaming-kafka-integration.html OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With