Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to fetch offset id while consuming Kafka from Spark, save it in Cassandra and use it to restart Kafka?

I am using Spark to consume data from Kafka and save it in Cassandra. My program is written in Java. I am using the spark-streaming-kafka_2.10:1.6.2 lib to accomplish this. My code is:

SparkConf sparkConf = new SparkConf().setAppName("name");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
Map<String,String> kafkaParams = new HashMap<>();
kafkaParams.put("zookeeper.connect", "127.0.0.1");
kafkaParams.put("group.id", App.GROUP);
JavaPairReceiverInputDStream<String, EventLog> messages =
  KafkaUtils.createStream(jssc, String.class, EventLog.class, StringDecoder.class, EventLogDecoder.class,
    kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2());
JavaDStream<EventLog> lines = messages.map(new Function<Tuple2<String, EventLog>, EventLog>() {
    @Override
    public EventLog call(Tuple2<String, EventLog> tuple2) {
        return tuple2._2();
    }
});
lines.foreachRDD(rdd -> {
    javaFunctions(rdd).writerBuilder("test", "event_log", mapToRow(EventLog.class)).saveToCassandra();
});
jssc.start();

In my Cassandra table event_log, there is a column named offsetid to store the offset ID of the stream. How do I get the offset id till where this stream has read the Kafka stream and store it in Cassandra?

After saving it in Cassandra, I want to use the latest offset id to be used when Spark is started again. How do I do that?

like image 217
khateeb Avatar asked Aug 26 '16 13:08

khateeb


People also ask

How do I get Kafka topic offset?

If your Kafka topic is in Confluent Cloud, use the kafka-console-consumer command with the --partition and --offset flags to read from a specific partition and offset. You can also read messages from a specified partition and offset using the Confluent Cloud Console: Run it.

Where Kafka offset is stored?

Answer: The consumer offset in general are stored in the internal topic __consumer_offsets . Consumer offset behaves like a bookmark for the consumer to start reading the messages from the point it left off.

How do I manually commit Kafka offset?

Manual commits: You can call a commitSync() or commitAsync() method anytime on the KafkaConsumer . When you issue the call, the consumer will take the offset of the last message received during a poll() and commit that to the Kafka server.


2 Answers

Below is the code for reference you may need to change the things as per your requirement. What I have done with the code and approach is that maintain Kafka partition wise offset for each topic in Cassandra(This can be done in zookeeper also as a suggestion using its java api). Store or update the the latest offset range for the topic with each string message received, in EventLog table. So always retrieve from table and see if present, then create direct stream from that offset, otherwise fresh direct stream.

package com.spark;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;

import scala.Tuple2;

public class KafkaChannelFetchOffset {
    public static void main(String[] args) {
        String topicName = "topicName";
        SparkConf sparkConf = new SparkConf().setAppName("name");
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
        HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topicName));
        HashMap<TopicAndPartition, Long> kafkaTopicPartition = new HashMap<TopicAndPartition, Long>();
        Map<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("zookeeper.connect", "127.0.0.1");
        kafkaParams.put("group.id", "GROUP");
        kafkaParams.put("metadata.broker.list", "127.0.0.1");
        List<EventLog> eventLogList = javaFunctions(jssc).cassandraTable("test", "event_log", mapRowTo(EventLog.class))
                .select("topicName", "partion", "fromOffset", "untilOffset").where("topicName=?", topicName).collect();
        JavaDStream<String> kafkaOutStream = null;
        if (eventLogList == null || eventLogList.isEmpty()) {
            kafkaOutStream = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams,
                    topicsSet).transform(new Function<JavaPairRDD<String, String>, JavaRDD<String>>() {
                @Override
                public JavaRDD<String> call(JavaPairRDD<String, String> pairRdd) throws Exception {
                    JavaRDD<String> rdd = pairRdd.map(new Function<Tuple2<String, String>, String>() {
                        @Override
                        public String call(Tuple2<String, String> arg0) throws Exception {
                            return arg0._2;
                        }
                    });
                    writeOffset(rdd, ((HasOffsetRanges) rdd.rdd()).offsetRanges());
                    return rdd;
                }
            });
        } else {
            for (EventLog eventLog : eventLogList) {
                kafkaTopicPartition.put(new TopicAndPartition(topicName, Integer.parseInt(eventLog.getPartition())),
                        Long.parseLong(eventLog.getUntilOffset()));
            }
            kafkaOutStream = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class,
                    kafkaParams, kafkaTopicPartition, new Function<MessageAndMetadata<String, String>, String>() {
                        @Override
                        public String call(MessageAndMetadata<String, String> arg0) throws Exception {
                            return arg0.message();
                        }
                    }).transform(new Function<JavaRDD<String>, JavaRDD<String>>() {

                @Override
                public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {
                    writeOffset(rdd, ((HasOffsetRanges) rdd.rdd()).offsetRanges());
                    return rdd;
                }
            });
        }
        // Use kafkaOutStream for further processing.
        jssc.start();
    }

    private static void writeOffset(JavaRDD<String> rdd, final OffsetRange[] offsets) {
        for (OffsetRange offsetRange : offsets) {
            EventLog eventLog = new EventLog();
            eventLog.setTopicName(String.valueOf(offsetRange.topic()));
            eventLog.setPartition(String.valueOf(offsetRange.partition()));
            eventLog.setFromOffset(String.valueOf(offsetRange.fromOffset()));
            eventLog.setUntilOffset(String.valueOf(offsetRange.untilOffset()));
            javaFunctions(rdd).writerBuilder("test", "event_log", null).saveToCassandra();
        }
    }
}

Hope this helps and resolve your problem...

like image 117
Anupam Jain Avatar answered Sep 19 '22 12:09

Anupam Jain


So, you want to manage kafka offsets on your own.

For this:

  1. use createDirectStream instead of createStream. That will allow you to specify from what offsets you would like to read (fromOffsets: Map[TopicAndPartition, Long])

  2. collect information about offsets you already processed. That can be done by saving offset for each message or you can have this information aggregated in separate table. To get offsets range from rdd: rdd.asInstanceOf[HasOffsetRanges].offsetRanges. For java (according to documentation) http://spark.apache.org/docs/latest/streaming-kafka-integration.html OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

like image 45
Natalia Avatar answered Sep 17 '22 12:09

Natalia