Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to convert a sklearn pipeline into a pyspark pipeline?

We have a machine learning classifier model that we have trained with a pandas dataframe and a standard sklearn pipeline (StandardScaler, RandomForestClassifier, GridSearchCV etc). We are working on Databricks and would like to scale up this pipeline to a large dataset using the parallel computation spark offers.

What is the quickest way to convert our sklearn pipeline into something that computes in parallel? (We can easily switch between pandas and spark DFs as required.)

For context, our options seem to be:

  1. Rewrite the pipeline using MLLib (time-consuming)
  2. Use a sklearn-spark bridging library

On option 2, Spark-Sklearn seems to be deprecated, but Databricks instead recommends that we use joblibspark. However, this raises an exception on Databricks:

from sklearn import svm, datasets
from sklearn.model_selection import GridSearchCV
from joblibspark import register_spark
from sklearn.utils import parallel_backend
register_spark() # register spark backend

iris = datasets.load_iris()
parameters = {'kernel':('linear', 'rbf'), 'C':[1, 10]}
svr = svm.SVC(gamma='auto')

clf = GridSearchCV(svr, parameters, cv=5)
with parallel_backend('spark', n_jobs=3):
    clf.fit(iris.data, iris.target)

raises

py4j.security.Py4JSecurityException: Method public int org.apache.spark.SparkContext.maxNumConcurrentTasks() is not whitelisted on class class org.apache.spark.SparkContext
like image 456
anonuser9674123 Avatar asked Sep 01 '20 12:09

anonuser9674123


People also ask

Can we use Sklearn in PySpark?

no, scikit learn doesn't work with pyspark & reason being scikit learn is a package which will work an individual computer whereas spark is a distributed environment.

Can I use scikit-learn in spark?

In addition to distributing ML tasks in Python across a cluster, Scikit-learn integration package for Spark provides additional tools to export data from Spark to python and vice-versa. You can find methods to convert Spark DataFrames to Pandas dataframes and numpy arrays.

How do you save the pipeline model on PySpark?

You can now save your pipeline: >>> model. save("/tmp/rf") SLF4J: Failed to load class "org.

What is a PySpark pipeline?

A Spark Pipeline is specified as a sequence of stages, and each stage is either a Transformer or an Estimator . These stages are run in order, and the input DataFrame is transformed as it passes through each stage.


1 Answers

According to the Databricks instructions (here and here), the necessary requirements are:

  • Python 3.6+
  • pyspark>=2.4
  • scikit-learn>=0.21
  • joblib>=0.14

I cannot reproduce your issue in a community Databricks cluster running Python 3.7.5, Spark 3.0.0, scikit-learn 0.22.1, and joblib 0.14.1:

import sys
import sklearn
import joblib

spark.version
# '3.0.0'

sys.version
# '3.7.5 (default, Nov  7 2019, 10:50:52) \n[GCC 8.3.0]'

sklearn.__version__
# '0.22.1'

joblib.__version__
# '0.14.1'

With the above settings, your code snippet runs smoothly, and produces indeed a classifier clf as:

GridSearchCV(cv=5, error_score=nan,
             estimator=SVC(C=1.0, break_ties=False, cache_size=200,
                           class_weight=None, coef0=0.0,
                           decision_function_shape='ovr', degree=3,
                           gamma='auto', kernel='rbf', max_iter=-1,
                           probability=False, random_state=None, shrinking=True,
                           tol=0.001, verbose=False),
             iid='deprecated', n_jobs=None,
             param_grid={'C': [1, 10], 'kernel': ('linear', 'rbf')},
             pre_dispatch='2*n_jobs', refit=True, return_train_score=False,
             scoring=None, verbose=0)

as does the alternative example from here:

from sklearn.utils import parallel_backend
from sklearn.model_selection import cross_val_score
from sklearn import datasets
from sklearn import svm
from joblibspark import register_spark

register_spark() # register spark backend

iris = datasets.load_iris()
clf = svm.SVC(kernel='linear', C=1)
with parallel_backend('spark', n_jobs=3):
  scores = cross_val_score(clf, iris.data, iris.target, cv=5)

print(scores)

giving

[0.96666667 1.         0.96666667 0.96666667 1.        ]
like image 162
desertnaut Avatar answered Nov 15 '22 19:11

desertnaut