Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to avoid one Spark Streaming window blocking another window with both running some native Python code

I'm running Spark Streaming with two different windows (on window for training a model with SKLearn and the other for predicting values based on that model) and I'm wondering how I can avoid one window (the "slow" training window) to train a model, without "blocking" the "fast" prediction window.
My simplified code looks as follows:

conf = SparkConf()
conf.setMaster("local[4]")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 1)

stream = ssc.socketTextStream("localhost", 7000)


import Custom_ModelContainer

### Window 1 ###
### predict data based on model computed in window 2 ###

def predict(time, rdd):
    try:
       # ... rdd conversion to df, feature extraction etc...

       # regular python code 
       X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
       pred = Custom_ModelContainer.getmodel().predict(X)

       # send prediction to GUI

    except Exception, e: print e

predictionStream = stream.window(60,60)
predictionStream.foreachRDD(predict)


### Window 2 ###
### fit new model ###

def trainModel(time, rdd):
try:
    # ... rdd conversion to df, feature extraction etc...

    X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
    y = np.array(df.map(lambda lp: lp.label).collect())

    # train test split etc...

    model = SVR().fit(X_train, y_train)
    Custom_ModelContainer.setModel(model)

except Exception, e: print e

modelTrainingStream = stream.window(600,600)
modelTrainingStream.foreachRDD(trainModel)

(Note: The Custom_ModelContainer is a class I wrote to save and retrieve the trained model)

My setup generally works fine, with the exception that every time a new model is trained in the second window (which takes about a minute), the first windows does not compute predictions until model training is finished. Actually, I guess that this makes sense, since model fitting and predictions are both computed on the master node (in a non-distributed setting - due to SKLearn).

So my question is the following: Would it be possible to train the model on a single worker node (instead of the master node)? If so, how could I achieve the latter and would that actually resolve my issue?

If not, any other suggestion on how I could make such a setup work without delaying computations in window 1?

Any help is greatly appreciated.

EDIT: I guess the more general question would be: How can I run two different task on two different workers in parallel?

like image 968
Kito Avatar asked Jan 27 '16 10:01

Kito


People also ask

What is the use of saveAsObjectFiles () operation on DStreams?

def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit. Save each RDD in this DStream as a Sequence file of serialized objects. Save each RDD in this DStream as a Sequence file of serialized objects. The file name at each batch interval is generated based on prefix and suffix : "prefix-TIME_IN_MS.

When a Spark Streaming application is deployed Which of the below task is responsible for receiving the data stream from an input data source?

Every input DStream (except file stream, discussed later in this section) is associated with a Receiver (Scala doc, Java doc) object which receives the data from a source and stores it in Spark's memory for processing. Spark Streaming provides two categories of built-in streaming sources.

What is sliding window in Spark Streaming?

It offers to apply transformations over a sliding window of data. The figure mentioned below explains this sliding window. As window slides over a source DStream, the source RDDs that fall within the window are combined. It also operated upon which produces spark RDDs of the windowed DStream.


2 Answers

Disclaimer: This is only a set of ideas. None of these has been tested in practice.


A couple of things you can try:

  1. Don't collect to predict. scikit-learn models are typically serializable so prediction process can be easily handled on the cluster:

    def predict(time, rdd):
        ... 
    
        model = Custom_ModelContainer.getmodel()
        pred = (df.rdd.map(lambda lp: lp.features.toArray())
            .mapPartitions(lambda iter: model.predict(np.array(list(iter)))))
        ...
    

    It should not only parallelize predictions but also, if raw data is not passed to GUI, reduce amount of data that has to be collected.

  2. Try to collect and send data asynchronously. PySpark doesn't provide collectAsync method but you can try to achieve something similar with concurrent.futures:

    from pyspark.rdd import RDD
    from concurrent.futures import ThreadPoolExecutor
    
    executor = ThreadPoolExecutor(max_workers=4)
    
    def submit_to_gui(*args): ...
    
    def submit_if_success(f):
        if not f.exception():
            executor.submit(submit_to_gui, f.result())
    

    continue from 1.

    def predict(time, rdd):
        ...
        f = executor.submit(RDD.collect, pred)
        f.add_done_callback(submit_if_success)
        ...
    
  3. If you really want to use local scikit-learn model try to collect and fit using futures as above. You can also try to collect only once, especially if data is not cached:

    def collect_and_train(df):
        y, X = zip(*((p.label, p.features.toArray()) for p in df.collect()))
        ...
        return SVR().fit(X_train, y_train)
    
    def set_if_success(f):
        if not f.exception():
            Custom_ModelContainer.setModel(f.result())  
    
    def trainModel(time, rdd): 
       ...
        f = excutor.submit(collect_and_train, df)
        f.add_done_callback(set_if_success) 
       ...
    
  4. Move training process to the cluster either using already existing solutions like spark-sklearn or custom approach:

    • naive solution - prepare your data, coalesce(1) and train a single model using mapPartitions.
    • distributed solution - create and validate a separate model per partition using mapPartitions, collect models and use as an ensemble for example by taking an average or median prediction.
  5. Throw away scikit-learn and use a model which can be trained and maintained in a distributed, streaming environment (for example StreamingLinearRegressionWithSGD).

    Your current approach makes Spark obsolete. If you can train model locally there is a good chance that you can perform all other tasks much faster on the local machine. Otherwise your program will simply fail on collect.

like image 68
zero323 Avatar answered Oct 18 '22 00:10

zero323


I think what you're looking for is the property: "spark.streaming.concurrentJobs" which defaults to 1. Increasing this should allow you to run multiple foreachRDD functions in parallel.

In JobScheduler.scala:

private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)

Just a reminder to also be aware of thread safety on your custom model container if you're going to be mutating and reading in parallel. :)

like image 43
Hamel Kothari Avatar answered Oct 17 '22 22:10

Hamel Kothari