Can I create a model in spark batch and use it on Spark streaming for real-time processing?
I have seen the various examples on Apache Spark site where both training and prediction are built on the same type of processing (linear regression).
No Support for Real-time Processing. In Spark Streaming, the arriving live stream of data is divided into batches of the pre-defined interval, and each batch of data is treated like Spark Resilient Distributed Database (RDDs). Then these RDDs are processed using the operations like map, reduce, join etc.
Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches. Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data.
Launched in 2014, Apache Spark is an open-source and multi-language data processing engine that allows you to implement distributed stream and batch processing operations for large-scale data workloads.
A batch interval tells spark that for what duration you have to fetch the data, like if its 1 minute, it would fetch the data for the last 1 minute. source: spark.apache.org. So the data would start pouring in a stream in batches, this continuous stream of data is called DStream.
Can I create a model in spark batch and use it on Spark streaming for real-time processing?
Ofcourse, yes. In spark community they call it offline training online predictions. Many training algorithms in spark allow you to save the model on file system HDFS/S3. Same model can be loaded by a streaming application. You simply call predict method of the model to do predictions.
See the section Streaming + MLLib in this link.
For example, if you want to train a DecisionTree offline and do predictions online...
In batch application -
val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,impurity, maxDepth, maxBins)
model.save(sc, "target/tmp/myDecisionTreeClassificationModel")
In streaming application -
val sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel")
sameModel.predict(newData)
here is one more solution which I just implemented.
I created a model in spark-Batch. suppose the final model object name is regmodel.
final LinearRegressionModel regmodel =algorithm.run(JavaRDD.toRDD(parsedData));
and spark context name is sc as
JavaSparkContext sc = new JavaSparkContext(sparkConf);
Now in a same code I am creating a spark streaming using the same sc
final JavaStreamingContext jssc = new JavaStreamingContext(sc,new Duration(Integer.parseInt(conf.getWindow().trim())));
and doing prediction like this:
JavaPairDStream<Double, Double> predictvalue = dist1.mapToPair(new PairFunction<LabeledPoint, Double,Double>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Double, Double> call(LabeledPoint v1) throws Exception {
Double p = v1.label();
Double q = regmodel.predict(v1.features());
return new Tuple2<Double, Double>(p,q);
}
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With