Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apply sklearn trained model on a dataframe with PySpark

I trained a random forest algorithm with Python and would like to apply it on a big dataset with PySpark.

I first loaded the trained sklearn RF model (with joblib), loaded my data that contains the features into a Spark dataframe and then I add a column with the predictions, with a user-defined function like that:

def predictClass(features):
    return rf.predict(features)
udfFunction = udf(predictClass, StringType())
new_dataframe = dataframe.withColumn('prediction', 
udfFunction('features'))

It takes so much time to run though, is there a more efficient way to do the same thing? (without using Spark ML)

like image 943
Pierre Avatar asked May 31 '17 13:05

Pierre


People also ask

Can you use Scikit learn with PySpark?

no, scikit learn doesn't work with pyspark & reason being scikit learn is a package which will work an individual computer whereas spark is a distributed environment.

Can you use Scikit learn with spark?

In addition to distributing ML tasks in Python across a cluster, Scikit-learn integration package for Spark provides additional tools to export data from Spark to python and vice-versa. You can find methods to convert Spark DataFrames to Pandas dataframes and numpy arrays.

Can I use PySpark instead of Pandas?

In very simple words Pandas run operations on a single machine whereas PySpark runs on multiple machines. If you are working on a Machine Learning application where you are dealing with larger datasets, PySpark is a best fit which could processes operations many times(100x) faster than Pandas.

Can Pandas DataFrame convert to DataFrame PySpark?

Spark provides a createDataFrame(pandas_dataframe) method to convert pandas to Spark DataFrame, Spark by default infers the schema based on the pandas data types to PySpark data types. If you want all data types to String use spark. createDataFrame(pandasDF. astype(str)) .


2 Answers

I had to do same thing in recent project. The bad thing about applying udf for each row that pyspark has to read sklearn model each time so that's why it takes ages to finish. The best solution I have found was to use .mapPartitions or foreachPartition method on rdd, really good explanation is here

https://github.com/mahmoudparsian/pyspark-tutorial/blob/master/tutorial/map-partitions/README.md

It works fast because it ensures you that there is no shuffling and for each partition pyspark has to read the model and predict only once. So, the flow would be:

  • convert DF to RDD
  • broadcast model to nodes so it will be accessible for workers
  • write an udf function which takes interator (which contains all rows within a partition) as an argument
  • iterate through rows and create a proper matrix with your features (order matters)
  • call .predict only once
  • return predictions
  • transform rdd to df if needed
like image 84
Jacek Placek Avatar answered Sep 23 '22 03:09

Jacek Placek


sklearn RF model can be quite large when being pickled. It is possible that frequent picklings/unpicklings of the model during task dispatch cause the problem. You could consider using broadcast variables.

From the official document:

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

like image 37
peter Avatar answered Sep 23 '22 03:09

peter