Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Combining Spark Streaming + MLlib

I've tried to use a Random Forest model in order to predict a stream of examples, but it appears that I cannot use that model to classify the examples. Here is the code used in pyspark:

sc = SparkContext(appName="App")

model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={}, impurity='gini', numTrees=150)


ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream(hostname, int(port))

parsedLines = lines.map(parse)
parsedLines.pprint()

predictions = parsedLines.map(lambda event: model.predict(event.features))

and the error returned while compiling it in the cluster:

  Error : "It appears that you are attempting to reference SparkContext from a broadcast "
    Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

is there a way to use a modèle generated from a static data to predict a streaming examples ?

Thanks guys i really appreciate it !!!!

like image 971
testing Avatar asked Apr 25 '16 10:04

testing


People also ask

Is spark MLlib deprecated?

MLlib automated MLflow tracking is deprecated on clusters that run Databricks Runtime 10.1 ML and above, and it is disabled by default on clusters running Databricks Runtime 10.2 ML and above. Instead, use MLflow PySpark ML autologging by calling mlflow.

What is the difference between spark ML and spark MLlib?

Choosing Between Spark MLlib and Spark ML At first glance, the most obvious difference between MLlib and ML is the data types they work on, with MLlib supporting RDDs and ML supporting DataFrame s and Dataset s.

Is MLlib part of spark?

MLlib is Spark's machine learning (ML) library. Its goal is to make practical machine learning scalable and easy. At a high level, it provides tools such as: ML Algorithms: common learning algorithms such as classification, regression, clustering, and collaborative filtering.

What is the difference between spark streaming and structured streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.


1 Answers

Yes, you can use model generated from static data. The problem you experience is not related to streaming at all. You simply cannot use JVM based model inside action or transformations (see How to use Java/Scala function from an action or a transformation? for an explanation why). Instead you should apply predict method to a complete RDD for example using transform on DStream:

from pyspark.mllib.tree import RandomForest
from pyspark.mllib.util import MLUtils
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from operator import attrgetter


sc = SparkContext("local[2]", "foo")
ssc = StreamingContext(sc, 1)

data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
trainingData, testData = data.randomSplit([0.7, 0.3])

model = RandomForest.trainClassifier(
    trainingData, numClasses=2, nmTrees=3
)

(ssc
    .queueStream([testData])
    # Extract features
    .map(attrgetter("features"))
    # Predict 
    .transform(lambda _, rdd: model.predict(rdd))
    .pprint())

ssc.start()
ssc.awaitTerminationOrTimeout(10)
like image 177
zero323 Avatar answered Sep 30 '22 07:09

zero323