I have this line in scala:
class SparkStreamingService(...){
val lines: org.apache.spark.streaming.dstream.InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
lines.foreachRDD { rdd =>
val df = cassandraSQLContext.read.json(rdd.map(x => x._2))
if (!df.rdd.isEmpty()) {
processDataFrameToLoadService(df)
}
else{
throw new BacSparkStreamingExpception("The dataframe created from kafka message is empty")
}
}
ssc.start()
ssc.awaitTermination()
}
Is there a way to mock InputDStream in Java? How can I go about unit testing this? Basically I want to mock cassandraSQLContext.read.json(rdd.map(x => x._2)) and then return a custom DF (which I can do), but how do I get past the first 3 lines? Currently it just polls for messages and the test never finishes.
True unit testing with any Spark context is all but impossible. I think you should instead integration test with in-memory values:
val sparkContext: SparkContext = ...
val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3"))
val rdd: RDD[(String, String)] = sparkContext.parallelize(data)
val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]]
val streamingContext = new StreamingContext(sparkContext, Seconds(1))
val dStream: InputDStream = streamingContext.queueStream(strings)
strings += rdd
...and go from there.
You also want to initialize as much of the Spark stuff as possible in some sort of "setup" or "before" functionality provided by your testing framework--and similarly dismantle it in a "tear down" or "after."
While not strictly unit tests, these tests will help you feel confident your processing code can handle streams, and if things go bad when the source of the streams becomes real (i.e. Kafka), then you can look at the Kafka side of things for the cause.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With