Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using dask as for task scheduling to run machine learning models in parallel

So basically what I want is to run ML Pipelines in parallel. I have been using scikit-learn, and I have decided to use DaskGridSearchCV.

I have is a list of gridSearchCV = DaskGridSearchCV(pipeline, grid, scoring=evaluator) objects, and I run each of them sequentially:

for gridSearchCV in list:
    gridSearchCV.fit(train_data, train_target)
    predicted = gridSearchCV.predict(test_data)

If I have N different GridSearch objects, I want to take advantage as much as possible of all the available resources. If there are resources to run 2, 3, 4, ... or N at the same time in parallel, I want to do so.

So I started trying a few things based on dask's documentation. First I tried dask.threaded and dask.multiprocessing but it ends up being slower and I keep getting:

/Library/Python/2.7/site-packages/sklearn/externals/joblib/parallel.py:540: UserWarning: Multiprocessing backed parallel loops cannot be nested below threads, setting n_jobs=1

This is the code snippet:

def run_pipeline(self, gs, data):

    train_data, test_data, train_target, expected = train_test_split(data, target, test_size=0.25, random_state=33)

    model = gs.fit(train_data, train_target)
    predicted = gs.predict(test_data)


values = [delayed(run_pipeline)(gs, df) for gs in gs_list]
compute(*values, get=dask.threaded.get)

Maybe I am approaching this the wrong way, would you have any suggestions for me?

like image 744
Larissa Leite Avatar asked Oct 17 '22 12:10

Larissa Leite


1 Answers

Yes, but I have a list of GridSearch objects, for example one using DecisionTree and another with RandomForest. And I wanna run them in parallel as long as there are resources for it.

If this is your goal, I would merge them all into the same grid. Scikit-Learn Pipelines support grid-search across steps, which would allow you to do your search in only a single GridSearchCV object (for an example of this from the scikit-learn docs, see here). If you only have a single estimator (instead of a pipeline), you can use a Pipeline with a single step as a proxy. For example:

from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
import dask_searchcv as dcv

pipeline = Pipeline([('est', DecisionTreeClassifier())])

grid = [
    {'est': [DecisionTreeClassifier()],
     'max_features': ['sqrt', 'log2'],
     # more parameters for DecisionTreeClassifier
    },
    {'est': [RandomForestClassifier()],
     'max_features': ['sqrt', 'log2'],
     # more parameters for RandomForesetClassifier
    },
    # more estimator/parameter subsets
]

gs = dcv.GridSearchCV(pipeline, grid)
gs.fit(train_data, train_target)
gs.predict(test_data)

Note that for this specific case (where all estimators share the same parameters, you can merge the grid:

grid = {'est': [DecisionTreeClassifier(), RandomForestClassifier()],
        'max_features': ['sqrt', 'log2'],
        # more parameters for all estimators}

As far as to why your delayed example didn't work - dask.delayed is for wrapping functions that don't call dask code. Since you're calling fit on a dask_searchcv.GridSearchCV object (which uses dask to compute) inside the delayed function (which also uses dask to compute), you're nesting calls to the dask scheduler, which can lead to poor performance at best, and weird bugs at worst.

like image 132
jiminy_crist Avatar answered Oct 21 '22 09:10

jiminy_crist