Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Combine results from batch RDD with streaming RDD in Apache Spark

Context: I am using Apache Spark to aggregate a running count of different event types from logs. The logs are stored in both Cassandra for historical analysis purposes and Kafka for realtime analysis purposes. Each log has a date and event type. For the purposes of simplicity, let's assume I wanted to keep track of the number of logs of a single type for each day.

We have two RDDs, an RDD of batch data from Cassandra and another streaming RDD from Kafka. Pseudocode:

CassandraJavaRDD<CassandraRow> cassandraRowsRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable(KEYSPACE, TABLE).select("date", "type");

JavaPairRDD<String, Integer> batchRDD = cassandraRowsRDD.mapToPair(new PairFunction<CassandraRow, String, Integer>() {
    @Override
    public Tuple2<String, Integer> call(CassandraRow row) {
        return new Tuple2<String, Integer>(row.getString("date"), 1);
    }
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer count1, Integer count2) {
        return count1 + count2;
    }
});

save(batchRDD) // Assume this saves the batch RDD somewhere

...

// Assume we read a chunk of logs from the Kafka stream every x seconds.
JavaPairReceiverInputDStream<String, String> kafkaStream =  KafkaUtils.createStream(...);
JavaPairDStream<String, Integer> streamRDD = kafkaStream.flatMapToPair(new PairFlatMapFunction<Tuple2<String, String>, String, Integer>() {
    @Override
    public Iterator<Tuple2<String, Integer> call(Tuple2<String, String> data) {
        String jsonString = data._2;
        JSON jsonObj = JSON.parse(jsonString);
        Date eventDate = ... // get date from json object
        // Assume startTime is broadcast variable that is set to the time when the job started.
        if (eventDate.after(startTime.value())) { 
            ArrayList<Tuple2<String, Integer>> pairs = new ArrayList<Tuple2<String, Integer>>();
            pairs.add(new Tuple2<String, Integer>(jsonObj.get("date"), 1));
            return pairs;
        } else {
            return new ArrayList<Tuple2<String, Integer>>(0); // Return empty list when we ignore some logs
        }
    }
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer count1, Integer count2) {
        return count1 + count2;
    }
}).updateStateByKey(new Function2<List<Integer>, Optional<List<Integer>>, Optional<Integer>>() {
    @Override
    public Optional<Integer> call(List<Integer> counts, Optional<Integer> state) {
        Integer previousValue = state.or(0l);
        Integer currentValue = ... // Sum of counts
        return Optional.of(previousValue + currentValue);
    }
});
save(streamRDD); // Assume this saves the stream RDD somewhere

sc.start();
sc.awaitTermination();

Question: How do I combine the results from the streamRDD with the batchRDD? Let's say that batchRDD has the following data and this job was run on 2014-10-16:

("2014-10-15", 1000000)
("2014-10-16", 2000000)

Since the Cassandra query only included all the data up to the start time of the batch query, we must read from Kafka when the query is finished, only considering logs after the job's start time. We assume that the query takes a long time. This means that I need to combine the historical results with the streaming results.

For illustration:

    |------------------------|-------------|--------------|--------->
tBatchStart             tStreamStart   streamBatch1  streamBatch2

Then suppose that in the first stream batch we got this data:

("2014-10-19", 1000)

Then I want to combine the batch RDD with this stream RDD so that the stream RDD now has the value:

("2014-10-19", 2001000)

Then suppose that in the second stream batch we got this data:

("2014-10-19", 4000)

Then the stream RDD should be updated to have the value:

("2014-10-19", 2005000)

And so on...

It's possible to use streamRDD.transformToPair(...) to combine the streamRDD data with the batchRDD data using a join, but if we do this for each stream chunk, then we would be adding the count from the batchRDD for every stream chunk making the state value "double counted", when it should only be added to the first stream chunk.

like image 879
Bobby Avatar asked Oct 23 '14 02:10

Bobby


People also ask

Which Spark Streaming function is used to combine streams that are running in parallel?

Note that, if you want to receive multiple streams of data in parallel in your streaming application, you can create multiple input DStreams (discussed further in the Performance Tuning section). This will create multiple receivers which will simultaneously receive multiple data streams.

Is Spark batch processing or stream processing?

Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches. Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data.

What is a batch interval in Spark Streaming?

A batch interval tells spark that for what duration you have to fetch the data, like if its 1 minute, it would fetch the data for the last 1 minute. source: spark.apache.org. So the data would start pouring in a stream in batches, this continuous stream of data is called DStream.

What is the use of saveAsObjectFiles () operation on DStreams?

def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit. Save each RDD in this DStream as a Sequence file of serialized objects. Save each RDD in this DStream as a Sequence file of serialized objects. The file name at each batch interval is generated based on prefix and suffix : "prefix-TIME_IN_MS.


2 Answers

To address this case, I'd union the base rdd with the result of the aggregated StateDStream that keeps the totals of the streaming data. This effectively provides a baseline for data reported on every streaming interval, without counting said baseline x times.

I tried that idea using the sample WordCount and it works. Drop this on the REPL for a live example:

(use nc -lk 9876 on a separate shell to provide input to the socketTextStream )

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel

@transient val defaults = List("magic" -> 2, "face" -> 5, "dust" -> 7 )
val defaultRdd = sc.parallelize(defaults)

@transient val ssc = new StreamingContext(sc, Seconds(10))
ssc.checkpoint("/tmp/spark")

val lines = ssc.socketTextStream("localhost", 9876, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCount = words.map(x => (x, 1)).reduceByKey(_ + _)
val historicCount = wordCount.updateStateByKey[Int]{(newValues: Seq[Int], runningCount: Option[Int]) => 
    Some(newValues.sum + runningCount.getOrElse(0))
}
val runningTotal = historicCount.transform{ rdd => rdd.union(defaultRdd)}.reduceByKey( _+_ )

wordCount.print()
historicCount.print()
runningTotal.print()
ssc.start()
like image 150
maasg Avatar answered Nov 16 '22 03:11

maasg


You could give updateStateByKey a try:

def main(args: Array[String]) {

    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
        val currentCount = values.foldLeft(0)(_ + _)
        val previousCount = state.getOrElse(0)
        Some(currentCount + previousCount)
    }

    // stream
    val ssc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(1))
    ssc.checkpoint(".")
    val lines = ssc.socketTextStream("127.0.0.1", 9999)
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val stateWordCounts = pairs.updateStateByKey[Int](updateFunc)
    stateWordCounts.print()
    ssc.start()
    ssc.awaitTermination()
}
like image 26
mithra Avatar answered Nov 16 '22 02:11

mithra