I have a large set of sklearn pipelines that I'd like to build in parallel with Dask. Here's a simple but naive sequential approach:
from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
iris = load_iris()
X_train, X_test, Y_train, Y_test = train_test_split(iris.data, iris.target, test_size=0.2)
pipe_nb = Pipeline([('clf', MultinomialNB())])
pipe_lr = Pipeline([('clf', LogisticRegression())])
pipe_rf = Pipeline([('clf', RandomForestClassifier())])
pipelines = [pipe_nb, pipe_lr, pipe_rf] # In reality, this would include many more different types of models with varying but specific parameters
for pl in pipelines:
pl.fit(X_train, Y_train)
Note that this is not GridSearchCV or RandomSearchCV problem
In the case of RandomSearchCV, I know how to parallelize it with Dask:
dask_client = Client('tcp://some.host.com:8786')
clf_rf = RandomForestClassifier()
param_dist = {'n_estimators': scipy.stats.randint(100, 500}
search_rf = RandomizedSearchCV(
clf_rf,
param_distributions=param_dist,
n_iter = 100,
scoring = 'f1',
cv=10,
error_score = 0,
verbose = 3,
)
with joblib.parallel_backend('dask'):
search_rf.fit(X_train, Y_train)
However, I'm not interested in hyperparameter tuning and it isn't clear how to modify this code in order to fit a set of multiple different models with their own specific parameters in parallel with Dask.
Distributed Training Alternatively, Scikit-Learn can use Dask for parallelism. This lets you train those estimators using all the cores of your cluster without significantly changing your code. This is most useful for training large models on medium-sized datasets.
Dask can scale these Joblib-backed algorithms out to a cluster of machines by providing an alternative Joblib backend. The following video demonstrates how to use Dask to parallelize a grid search across a cluster.
Scikit-learn uses joblib for simple parallelism in many places. Anywhere you pass an n_jobs keyword, scikit-learn is internally calling joblib.
Sklearn Joblib SummaryYou can connect joblib to the Dask backend to scale out to a remote cluster for even faster processing times. You can use XGBoost-on-Dask and/or dask-ml for distributed machine learning training on datasets that don't fit into local memory.
dask.delayed
is probably the easiest solution here.
from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
iris = load_iris()
X_train, X_test, Y_train, Y_test = train_test_split(iris.data, iris.target, test_size=0.2)
pipe_nb = Pipeline([('clf', MultinomialNB())])
pipe_lr = Pipeline([('clf', LogisticRegression())])
pipe_rf = Pipeline([('clf', RandomForestClassifier())])
pipelines = [pipe_nb, pipe_lr, pipe_rf] # In reality, this would include many more different types of models with varying but specific parameters
# Use dask.delayed instead of a for loop.
import dask.delayed
pipelines_ = [dask.delayed(pl).fit(X_train, Y_train) for pl in pipelines]
fit_pipelines = dask.compute(*pipelines_)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With