I'm running Spark Streaming with two different windows (on window for training a model with SKLearn and the other for predicting values based on that model) and I'm wondering how I can avoid one window (the "slow" training window) to train a model, without "blocking" the "fast" prediction window.
My simplified code looks as follows:
conf = SparkConf()
conf.setMaster("local[4]")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 1)
stream = ssc.socketTextStream("localhost", 7000)
import Custom_ModelContainer
### Window 1 ###
### predict data based on model computed in window 2 ###
def predict(time, rdd):
try:
# ... rdd conversion to df, feature extraction etc...
# regular python code
X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
pred = Custom_ModelContainer.getmodel().predict(X)
# send prediction to GUI
except Exception, e: print e
predictionStream = stream.window(60,60)
predictionStream.foreachRDD(predict)
### Window 2 ###
### fit new model ###
def trainModel(time, rdd):
try:
# ... rdd conversion to df, feature extraction etc...
X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
y = np.array(df.map(lambda lp: lp.label).collect())
# train test split etc...
model = SVR().fit(X_train, y_train)
Custom_ModelContainer.setModel(model)
except Exception, e: print e
modelTrainingStream = stream.window(600,600)
modelTrainingStream.foreachRDD(trainModel)
(Note: The Custom_ModelContainer is a class I wrote to save and retrieve the trained model)
My setup generally works fine, with the exception that every time a new model is trained in the second window (which takes about a minute), the first windows does not compute predictions until model training is finished. Actually, I guess that this makes sense, since model fitting and predictions are both computed on the master node (in a non-distributed setting - due to SKLearn).
So my question is the following: Would it be possible to train the model on a single worker node (instead of the master node)? If so, how could I achieve the latter and would that actually resolve my issue?
If not, any other suggestion on how I could make such a setup work without delaying computations in window 1?
Any help is greatly appreciated.
EDIT: I guess the more general question would be: How can I run two different task on two different workers in parallel?
def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit. Save each RDD in this DStream as a Sequence file of serialized objects. Save each RDD in this DStream as a Sequence file of serialized objects. The file name at each batch interval is generated based on prefix and suffix : "prefix-TIME_IN_MS.
Every input DStream (except file stream, discussed later in this section) is associated with a Receiver (Scala doc, Java doc) object which receives the data from a source and stores it in Spark's memory for processing. Spark Streaming provides two categories of built-in streaming sources.
It offers to apply transformations over a sliding window of data. The figure mentioned below explains this sliding window. As window slides over a source DStream, the source RDDs that fall within the window are combined. It also operated upon which produces spark RDDs of the windowed DStream.
Disclaimer: This is only a set of ideas. None of these has been tested in practice.
A couple of things you can try:
Don't collect
to predict
. scikit-learn
models are typically serializable so prediction process can be easily handled on the cluster:
def predict(time, rdd):
...
model = Custom_ModelContainer.getmodel()
pred = (df.rdd.map(lambda lp: lp.features.toArray())
.mapPartitions(lambda iter: model.predict(np.array(list(iter)))))
...
It should not only parallelize predictions but also, if raw data is not passed to GUI, reduce amount of data that has to be collected.
Try to collect
and send data asynchronously. PySpark doesn't provide collectAsync
method but you can try to achieve something similar with concurrent.futures
:
from pyspark.rdd import RDD
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
def submit_to_gui(*args): ...
def submit_if_success(f):
if not f.exception():
executor.submit(submit_to_gui, f.result())
continue from 1.
def predict(time, rdd):
...
f = executor.submit(RDD.collect, pred)
f.add_done_callback(submit_if_success)
...
If you really want to use local scikit-learn
model try to collect
and fit
using futures as above. You can also try to collect only once, especially if data is not cached:
def collect_and_train(df):
y, X = zip(*((p.label, p.features.toArray()) for p in df.collect()))
...
return SVR().fit(X_train, y_train)
def set_if_success(f):
if not f.exception():
Custom_ModelContainer.setModel(f.result())
def trainModel(time, rdd):
...
f = excutor.submit(collect_and_train, df)
f.add_done_callback(set_if_success)
...
Move training process to the cluster either using already existing solutions like spark-sklearn
or custom approach:
coalesce(1)
and train a single model using mapPartitions
.mapPartitions
, collect models and use as an ensemble for example by taking an average or median prediction.Throw away scikit-learn
and use a model which can be trained and maintained in a distributed, streaming environment (for example StreamingLinearRegressionWithSGD
).
Your current approach makes Spark obsolete. If you can train model locally there is a good chance that you can perform all other tasks much faster on the local machine. Otherwise your program will simply fail on collect
.
I think what you're looking for is the property: "spark.streaming.concurrentJobs" which defaults to 1. Increasing this should allow you to run multiple foreachRDD functions in parallel.
In JobScheduler.scala:
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
Just a reminder to also be aware of thread safety on your custom model container if you're going to be mutating and reading in parallel. :)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With