So basically what I want is to run ML Pipelines in parallel.
I have been using scikit-learn, and I have decided to use DaskGridSearchCV
.
I have is a list of gridSearchCV = DaskGridSearchCV(pipeline, grid, scoring=evaluator)
objects, and I run each of them sequentially:
for gridSearchCV in list:
gridSearchCV.fit(train_data, train_target)
predicted = gridSearchCV.predict(test_data)
If I have N different GridSearch
objects, I want to take advantage as much as possible of all the available resources. If there are resources to run 2, 3, 4, ... or N at the same time in parallel, I want to do so.
So I started trying a few things based on dask's documentation. First I tried dask.threaded
and dask.multiprocessing
but it ends up being slower and I keep getting:
/Library/Python/2.7/site-packages/sklearn/externals/joblib/parallel.py:540: UserWarning: Multiprocessing backed parallel loops cannot be nested below threads, setting n_jobs=1
This is the code snippet:
def run_pipeline(self, gs, data):
train_data, test_data, train_target, expected = train_test_split(data, target, test_size=0.25, random_state=33)
model = gs.fit(train_data, train_target)
predicted = gs.predict(test_data)
values = [delayed(run_pipeline)(gs, df) for gs in gs_list]
compute(*values, get=dask.threaded.get)
Maybe I am approaching this the wrong way, would you have any suggestions for me?
Yes, but I have a list of GridSearch objects, for example one using DecisionTree and another with RandomForest. And I wanna run them in parallel as long as there are resources for it.
If this is your goal, I would merge them all into the same grid. Scikit-Learn Pipelines support grid-search across steps, which would allow you to do your search in only a single GridSearchCV
object (for an example of this from the scikit-learn docs, see here). If you only have a single estimator (instead of a pipeline), you can use a Pipeline
with a single step as a proxy. For example:
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
import dask_searchcv as dcv
pipeline = Pipeline([('est', DecisionTreeClassifier())])
grid = [
{'est': [DecisionTreeClassifier()],
'max_features': ['sqrt', 'log2'],
# more parameters for DecisionTreeClassifier
},
{'est': [RandomForestClassifier()],
'max_features': ['sqrt', 'log2'],
# more parameters for RandomForesetClassifier
},
# more estimator/parameter subsets
]
gs = dcv.GridSearchCV(pipeline, grid)
gs.fit(train_data, train_target)
gs.predict(test_data)
Note that for this specific case (where all estimators share the same parameters, you can merge the grid:
grid = {'est': [DecisionTreeClassifier(), RandomForestClassifier()],
'max_features': ['sqrt', 'log2'],
# more parameters for all estimators}
As far as to why your delayed example didn't work - dask.delayed
is for wrapping functions that don't call dask code. Since you're calling fit
on a dask_searchcv.GridSearchCV
object (which uses dask to compute) inside the delayed function (which also uses dask to compute), you're nesting calls to the dask scheduler, which can lead to poor performance at best, and weird bugs at worst.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With