I trained a random forest algorithm with Python and would like to apply it on a big dataset with PySpark.
I first loaded the trained sklearn RF model (with joblib), loaded my data that contains the features into a Spark dataframe and then I add a column with the predictions, with a user-defined function like that:
def predictClass(features):
return rf.predict(features)
udfFunction = udf(predictClass, StringType())
new_dataframe = dataframe.withColumn('prediction',
udfFunction('features'))
It takes so much time to run though, is there a more efficient way to do the same thing? (without using Spark ML)
no, scikit learn doesn't work with pyspark & reason being scikit learn is a package which will work an individual computer whereas spark is a distributed environment.
In addition to distributing ML tasks in Python across a cluster, Scikit-learn integration package for Spark provides additional tools to export data from Spark to python and vice-versa. You can find methods to convert Spark DataFrames to Pandas dataframes and numpy arrays.
In very simple words Pandas run operations on a single machine whereas PySpark runs on multiple machines. If you are working on a Machine Learning application where you are dealing with larger datasets, PySpark is a best fit which could processes operations many times(100x) faster than Pandas.
Spark provides a createDataFrame(pandas_dataframe) method to convert pandas to Spark DataFrame, Spark by default infers the schema based on the pandas data types to PySpark data types. If you want all data types to String use spark. createDataFrame(pandasDF. astype(str)) .
I had to do same thing in recent project. The bad thing about applying udf for each row that pyspark has to read sklearn model each time so that's why it takes ages to finish. The best solution I have found was to use .mapPartitions or foreachPartition method on rdd, really good explanation is here
https://github.com/mahmoudparsian/pyspark-tutorial/blob/master/tutorial/map-partitions/README.md
It works fast because it ensures you that there is no shuffling and for each partition pyspark has to read the model and predict only once. So, the flow would be:
sklearn RF model can be quite large when being pickled. It is possible that frequent picklings/unpicklings of the model during task dispatch cause the problem. You could consider using broadcast variables.
From the official document:
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With