Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark CrossValidatorModel access other models than the bestModel?

I am using Spark 1.6.1:

Currently I am using a CrossValidator to train my ML Pipeline with various parameters. After the training process I can use the bestModel property of the CrossValidatorModel to get the Model that performed best during the Cross Validation. Are the other models of the cross validation automatically discarded or can I select a model that performed worse than the bestModel?

I am asking because I am using the F1 Score metric for the cross validation but I am also interested in the weighedRecall of all of the models and not just of the model that has performed best during the crossvalidation

val folds = 6
val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new MulticlassClassificationEvaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(folds)

val avgF1Scores = cvModel.avgMetrics

val predictedDf = cvModel.bestModel.transform(testDf)

// Here I would like to predict as well with the other models of the cross validation
like image 896
MeiSign Avatar asked Aug 10 '16 13:08

MeiSign


2 Answers

Spark >= 2.4.0 ( >= 2.3.0 in Scala)

SPARK-21088 CrossValidator, TrainValidationSplit should collect all models when fitting - adds support for collecting submodels.

cv = CrossValidator(..., collectSubModels=True)

model = cv.fit(...)
model.subModels

Spark < 2.4

If you want to access all intermediate models you'll have to create custom cross validator from scratch. o.a.s.ml.tuning.CrossValidator discards other models, and only the best one and metrics are copied to the CrossValidatorModel.

See also Pyspark - Get all parameters of models created with ParamGridBuilder

like image 156
5 revs, 3 users 55% Avatar answered Sep 18 '22 18:09

5 revs, 3 users 55%


If you're just looking to do this for experimentation as opposed to a production implementation of something, I recommend monkey-patching. Here is what I did to print out the intermediate training results. Just use CrossValidatorVerbose as a drop-in replacement for CrossValidator.

import numpy as np

from pyspark.ml.tuning import CrossValidator, CrossValidatorModel
from pyspark.sql.functions import rand


class CrossValidatorVerbose(CrossValidator):

    def _fit(self, dataset):
        est = self.getOrDefault(self.estimator)
        epm = self.getOrDefault(self.estimatorParamMaps)
        numModels = len(epm)

        eva = self.getOrDefault(self.evaluator)
        metricName = eva.getMetricName()

        nFolds = self.getOrDefault(self.numFolds)
        seed = self.getOrDefault(self.seed)
        h = 1.0 / nFolds

        randCol = self.uid + "_rand"
        df = dataset.select("*", rand(seed).alias(randCol))
        metrics = [0.0] * numModels

        for i in range(nFolds):
            foldNum = i + 1
            print("Comparing models on fold %d" % foldNum)

            validateLB = i * h
            validateUB = (i + 1) * h
            condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB)
            validation = df.filter(condition)
            train = df.filter(~condition)

            for j in range(numModels):
                paramMap = epm[j]
                model = est.fit(train, paramMap)
                # TODO: duplicate evaluator to take extra params from input
                metric = eva.evaluate(model.transform(validation, paramMap))
                metrics[j] += metric

                avgSoFar = metrics[j] / foldNum
                print("params: %s\t%s: %f\tavg: %f" % (
                    {param.name: val for (param, val) in paramMap.items()},
                    metricName, metric, avgSoFar))

        if eva.isLargerBetter():
            bestIndex = np.argmax(metrics)
        else:
            bestIndex = np.argmin(metrics)

        bestParams = epm[bestIndex]
        bestModel = est.fit(dataset, bestParams)
        avgMetrics = [m / nFolds for m in metrics]
        bestAvg = avgMetrics[bestIndex]
        print("Best model:\nparams: %s\t%s: %f" % (
            {param.name: val for (param, val) in bestParams.items()},
            metricName, bestAvg))

        return self._copyValues(CrossValidatorModel(bestModel, avgMetrics))

NOTE: this solution also corrects what I see as a bug in v2.0.0 where the CrossValidationModel.avgMetrics are set to the sum of the metrics instead of the average.

Here is an example of the output for a simple 5-fold validation of ALS:

Comparing models on fold 1
params: {'regParam': 0.1, 'rank': 5, 'maxIter': 10}     rmse: 1.122425  avg: 1.122425
params: {'regParam': 0.01, 'rank': 5, 'maxIter': 10}    rmse: 1.123537  avg: 1.123537
params: {'regParam': 0.001, 'rank': 5, 'maxIter': 10}   rmse: 1.123651  avg: 1.123651
Comparing models on fold 2
params: {'regParam': 0.1, 'rank': 5, 'maxIter': 10}     rmse: 0.992541  avg: 1.057483
params: {'regParam': 0.01, 'rank': 5, 'maxIter': 10}    rmse: 0.992541  avg: 1.058039
params: {'regParam': 0.001, 'rank': 5, 'maxIter': 10}   rmse: 0.992541  avg: 1.058096
Comparing models on fold 3
params: {'regParam': 0.1, 'rank': 5, 'maxIter': 10}     rmse: 1.141786  avg: 1.085584
params: {'regParam': 0.01, 'rank': 5, 'maxIter': 10}    rmse: 1.141786  avg: 1.085955
params: {'regParam': 0.001, 'rank': 5, 'maxIter': 10}   rmse: 1.141786  avg: 1.085993
Comparing models on fold 4
params: {'regParam': 0.1, 'rank': 5, 'maxIter': 10}     rmse: 0.954110  avg: 1.052715
params: {'regParam': 0.01, 'rank': 5, 'maxIter': 10}    rmse: 0.952955  avg: 1.052705
params: {'regParam': 0.001, 'rank': 5, 'maxIter': 10}   rmse: 0.952873  avg: 1.052713
Comparing models on fold 5
params: {'regParam': 0.1, 'rank': 5, 'maxIter': 10}     rmse: 1.140098  avg: 1.070192
params: {'regParam': 0.01, 'rank': 5, 'maxIter': 10}    rmse: 1.139589  avg: 1.070082
params: {'regParam': 0.001, 'rank': 5, 'maxIter': 10}   rmse: 1.139535  avg: 1.070077
Best model:
params: {'regParam': 0.001, 'rank': 5, 'maxIter': 10}   rmse: 1.070077
like image 22
Mack Avatar answered Sep 21 '22 18:09

Mack