Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to carry data streams over multiple batch intervals in Spark Streaming

I'm using Apache Spark Streaming 1.6.1 to write a Java application that joins two Key/Value data streams and writes the output to HDFS. The two data streams contain K/V strings and are periodically ingested in Spark from HDFS by using textFileStream().

The two data streams aren't synchronized, which means that some keys that are in stream1 at time t0 may appear in stream2 at time t1, or the vice versa. Hence, my goal is to join the two streams and compute "leftover" keys, which should be considered for the join operation in the next batch intervals.

To better clarify this, look at the following algorithm:

variables:
stream1 = <String, String> input stream at time t1
stream2 = <String, String> input stream at time t1
left_keys_s1 = <String, String> records of stream1 that didn't appear in the join at time t0
left_keys_s2 = <String, String> records of stream2 that didn't appear in the join at time t0

operations at time t1:
out_stream = (stream1 + left_keys_s1) join (stream2 + left_keys_s2)
write out_stream to HDFS
left_keys_s1 = left_keys_s1 + records of stream1 not in out_stream (should be used at time t2)
left_keys_s2 = left_keys_s2 + records of stream2 not in out_stream (should be used at time t2)

I've tried to implement this algorithm with Spark Streaming unsuccessfully. Initially, I create two empty streams for leftover keys in this way (this is only one stream, but the code to generate the second stream is similar):

JavaRDD<String> empty_rdd = sc.emptyRDD(); //sc = Java Spark Context
Queue<JavaRDD<String>> q = new LinkedList<JavaRDD<String>>();
q.add(empty_rdd);
JavaDStream<String> empty_dstream = jssc.queueStream(q);
JavaPairDStream<String, String> k1 = empty_dstream.mapToPair(new PairFunction<String, String, String> () {
                                 @Override
                                 public scala.Tuple2<String, String> call(String s) {
                                   return new scala.Tuple2(s, s);
                                 }
                               });

Later on, this empty stream is unified (i.e., union()) with stream1 and finally, after the join, I add the leftover keys from stream1 and call window(). The same happens with stream2.

The problem is that the operations that generate left_keys_s1 and left_keys_s2 are transformations without actions, which means that Spark doesn't create any RDD flow graph and, hence, they are never executed. What I get right now is a join that outputs only the records whose keys are in stream1 and stream2 in the same time interval.

Do you guys have any suggestion to implement this correctly with Spark?

Thanks, Marco

like image 958
Marco Avatar asked Nov 08 '22 14:11

Marco


1 Answers

It should be possible to carry-over values from one batch to the next by keeping a reference to an RDD where we those values are held.

Don't try to merge streams using the queueDStream, instead declare a mutable RDD reference that can be updates at each streaming interval.

This is an example:

In this streaming job, we start with an RDD carring 100 integers. Each interval, 10 random numbers are generated and substracted for those initial 100 integers. This process continues until the initial RDD with 100 elements is empty. This example shows how to carry over elements from one interval to the next.

  import scala.util.Random
  import org.apache.spark.streaming.dstream._

  val ssc = new StreamingContext(sparkContext, Seconds(2))

  var targetInts:RDD[Int] = sc.parallelize(0 until 100)

  var loops = 0

  // we create an rdd of functions that generate random data. 
  // evaluating this RDD at each interval will generate new random data points.
  val randomDataRdd = sc.parallelize(1 to 10).map(_ => () => Random.nextInt(100))

  val dstream = new ConstantInputDStream(ssc, randomDataRdd)

  // create values from the random func rdd

  dataDStream.foreachRDD{rdd => 
                        loops += 1
                        targetInts = targetInts.subtract(rdd)
                        if (targetInts.isEmpty) {println(loops); ssc.stop(false)}
                       }


  ssc.start()

Running this example and plotting loops against targetInts.count gives the following chart:

Removing 100 ints by generating random numbers

I hope this gives you enough guidance to implement the complete usecase.

like image 150
maasg Avatar answered Nov 15 '22 10:11

maasg