Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark LinearRegressionSummary "normal" summary

According to LinearRegressionSummary (Spark 2.1.0 JavaDoc), p-values are only available for the "normal" solver.

This value is only available when using the "normal" solver.

What the hell is the "normal" solver?

I'm doing this:

import org.apache.spark.ml.{Pipeline, PipelineModel} 
import org.apache.spark.ml.evaluation.RegressionEvaluator 
import org.apache.spark.ml.feature.VectorAssembler 
import org.apache.spark.ml.regression.LinearRegressionModel 
import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGridBuilder} 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.{DataFrame, SparkSession}
    .
    .
    .
val (trainingData, testData): (DataFrame, DataFrame) = 
  com.acme.pta.accuracy.Util.splitData(output, testProportion)
    .
    .
    .
val lr = 
  new org.apache.spark.ml.regression.LinearRegression()
  .setSolver("normal").setMaxIter(maxIter)

val pipeline = new Pipeline()
  .setStages(Array(lr))

val paramGrid = new ParamGridBuilder()
  .addGrid(lr.elasticNetParam, Array(0.2, 0.4, 0.8, 0.9))
  .addGrid(lr.regParam, Array(0,6, 0.3, 0.1, 0.01))
  .build()

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(numFolds) // Use 3+ in practice

val cvModel: CrossValidatorModel = cv.fit(trainingData)

val pipelineModel: PipelineModel = cvModel.bestModel.asInstanceOf[PipelineModel]
val lrModel: LinearRegressionModel = 
  pipelineModel.stages(0).asInstanceOf[LinearRegressionModel]

val modelSummary = lrModel.summary
Holder.log.info("lrModel.summary: " + modelSummary)
try {
  Holder.log.info("feature p values: ")
  // Exception occurs on line below.
  val featuresAndPValues = features.zip(lrModel.summary.pValues)
  featuresAndPValues.foreach(
    (featureAndPValue: (String, Double)) => 
    Holder.log.info(
      "feature: " + featureAndPValue._1 + ": " + featureAndPValue._2))
} catch {
  case _: java.lang.UnsupportedOperationException 
            => Holder.log.error("Cannot compute p-values")
}

I am still getting the UnsupportedOperationException.

The exception message is:

No p-value available for this LinearRegressionModel

Is there something else I need to be doing? I'm using

  "org.apache.spark" %% "spark-mllib" % "2.1.1"

Is pValues supported in that version?

like image 247
Paul Reiners Avatar asked Oct 11 '17 19:10

Paul Reiners


People also ask

How to get column summary statistics for RDD [vector] using MLlib?

Spark's MLlib provides column summary statistics for RDD [Vector] through the function colStats available in Statistics. The method returns an instance of MultivariateStatisticalSummary, which contains the column-wise max, min, mean, variance, and number of nonzeros, as well as the total count.

What is local vector in Spark ML?

A local vector is often used as a base type for RDDs in Spark MLlib. A local vector has integer-typed and 0-based indices and double-typed values, stored on a single machine. MLlib supports two types of local vectors: dense and sparse.

What is the normal equation?

The normal equation is a closed-form solution used to find the value of θ that minimizes the cost function. Another way to describe the normal equation is as a one-step algorithm used to analytically find the coefficients that minimize the loss function. Both descriptions work, but what exactly do they mean?

What are dense and sparse vectors in MLlib?

MLlib supports two types of local vectors: dense and sparse. A dense vector is backed by a double array representing its entry values, while a sparse vector is backed by two parallel arrays: indices and values. For dense vectors, MLlib uses either Python lists or the NumPy array type.


1 Answers

Updated

tl;dr

Solution 1

In normal LinearRegression pValues and other "normal" statistics are only present when one of the parameters elasticNetParam or regParam is zero. So you can change

.addGrid( lr.elasticNetParam, Array( 0.0 ) )

or

.addGrid( lr.regParam, Array( 0.0 ) )

Solution 2

Make custom version of LinearRegression which would explicitly use

  1. "normal" solver for regression.
  2. Cholesky solver for WeightedLeastSquares.

I made this class as an extension to ml.regression package.

package org.apache.spark.ml.regression

import scala.collection.mutable

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.optim.WeightedLeastSquares
import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators}
import org.apache.spark.ml.util._
import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._

class CholeskyLinearRegression ( override val uid: String )
    extends Regressor[ Vector, CholeskyLinearRegression, LinearRegressionModel ]
    with LinearRegressionParams with DefaultParamsWritable with Logging {

    import CholeskyLinearRegression._

    def this() = this(Identifiable.randomUID("linReg"))

    def setRegParam(value: Double): this.type = set(regParam, value)
    setDefault(regParam -> 0.0)

    def setFitIntercept(value: Boolean): this.type = set(fitIntercept, value)
    setDefault(fitIntercept -> true)

    def setStandardization(value: Boolean): this.type = set(standardization, value)
    setDefault(standardization -> true)

    def setElasticNetParam(value: Double): this.type = set(elasticNetParam, value)
    setDefault(elasticNetParam -> 0.0)

    def setMaxIter(value: Int): this.type = set(maxIter, value)
    setDefault(maxIter -> 100)

    def setTol(value: Double): this.type = set(tol, value)
    setDefault(tol -> 1E-6)

    def setWeightCol(value: String): this.type = set(weightCol, value)

    def setSolver(value: String): this.type = set(solver, value)
    setDefault(solver -> Auto)

    def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
    setDefault(aggregationDepth -> 2)

    override protected def train(dataset: Dataset[_]): LinearRegressionModel = {

        // Extract the number of features before deciding optimization solver.
        val numFeatures = dataset.select(col($(featuresCol))).first().getAs[Vector](0).size
        val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol))

        val instances: RDD[Instance] = 
            dataset
            .select( col( $(labelCol) ), w, col( $(featuresCol) ) )
            .rdd.map {
                case Row(label: Double, weight: Double, features: Vector) =>
                Instance(label, weight, features)
            }

        // if (($(solver) == Auto &&
        //   numFeatures <= WeightedLeastSquares.MAX_NUM_FEATURES) || $(solver) == Normal) {
        // For low dimensional data, WeightedLeastSquares is more efficient since the
        // training algorithm only requires one pass through the data. (SPARK-10668)

        val optimizer = new WeightedLeastSquares( 
            $(fitIntercept), 
            $(regParam),
            elasticNetParam = $(elasticNetParam), 
            $(standardization), 
            true,
            solverType = WeightedLeastSquares.Cholesky, 
            maxIter = $(maxIter), 
            tol = $(tol)
        )

        val model = optimizer.fit(instances)

        val lrModel = copyValues(new LinearRegressionModel(uid, model.coefficients, model.intercept))
        val (summaryModel, predictionColName) = lrModel.findSummaryModelAndPredictionCol()

        val trainingSummary = new LinearRegressionTrainingSummary(
            summaryModel.transform(dataset),
            predictionColName,
            $(labelCol),
            $(featuresCol),
            summaryModel,
            model.diagInvAtWA.toArray,
            model.objectiveHistory
        )

        lrModel
        .setSummary( Some( trainingSummary ) )

        lrModel
    }

    override def copy(extra: ParamMap): CholeskyLinearRegression = defaultCopy(extra)
}

object CholeskyLinearRegression 
    extends DefaultParamsReadable[CholeskyLinearRegression] {

    override def load(path: String): CholeskyLinearRegression = super.load(path)

    val MAX_FEATURES_FOR_NORMAL_SOLVER: Int = WeightedLeastSquares.MAX_NUM_FEATURES

    /** String name for "auto". */
    private[regression] val Auto = "auto"

    /** String name for "normal". */
    private[regression] val Normal = "normal"

    /** String name for "l-bfgs". */
    private[regression] val LBFGS = "l-bfgs"

    /** Set of solvers that LinearRegression supports. */
    private[regression] val supportedSolvers = Array(Auto, Normal, LBFGS)
}

All you have to do is to paste it to the separate file in the project and change LinearRegression to CholeskyLinearRegression in your code.

val lr = new CholeskyLinearRegression() // new LinearRegression()
        .setSolver( "normal" )
        .setMaxIter( maxIter )

It works with non-zero params and gives pValues. Tested on following params grid.

val paramGrid = new ParamGridBuilder()
        .addGrid( lr.elasticNetParam, Array( 0.2, 0.4, 0.8, 0.9 ) )
        .addGrid( lr.regParam, Array( 0.6, 0.3, 0.1, 0.01 ) )
        .build()

Full investigation

I initially thought that the main issue is with the model being not fully preserved. Trained model is not preserved after fitting in CrossValidator. It is understandable because of memory consumption. There is an ongoing debate on how should it be resolved. Issue in JIRA.

You can see in the commented section that I tried to extract parameters from the best model in order to run it again. Then I found out that the model summary is ok, it's just for some parameters diagInvAtWa has length of 1 and basically a zero.

For ridge regression or Tikhonov regularization (elasticNet = 0) and any regParam pValues and other "normal" statistics can be computed but for Lasso method and something in between (elastic net) not. Same goes for regParam = 0: with any elasticNet pValues were computed.

Why is that

LinearRegression uses Weighted Least Square optimizer for "normal" solver with solverType = WeightedLeastSquares.Auto. This optimizer has two options for solvers: QuasiNewton or Cholesky. The former is selected only when both regParam and elasticNetParam are non-zeroes.

val solver = if (
    ( solverType == WeightedLeastSquares.Auto && 
        elasticNetParam != 0.0 && 
        regParam != 0.0 ) ||
    ( solverType == WeightedLeastSquares.QuasiNewton ) ) {

    ...
    new QuasiNewtonSolver(fitIntercept, maxIter, tol, effectiveL1RegFun)
} else {
    new CholeskySolver
}

So in your parameters grid the QuasiNewtonSolver will be always used because there are no combinations of regParam and elasticNetParam where one of them is zero.

We know that in order to get pValues and other "normal" statistics such as t-statistic or std. error of coefficients the diagonal of matrix (A^T * W * A)^-1 (diagInvAtWA) must not be a vector with only one zero. This condition is set in definition of pValues.

diagInvAtWA is a vector of diagonal elements of packed upper triangular matrix (solution.aaInv).

val diagInvAtWA = solution.aaInv.map { inv => ...

For Cholesky solver it is calculated but for QuasiNewton not. Second parameter for NormalEquationSolution is this matrix.

You technically could make your own version of LinearRegression with

Reproduction

In this example I used data sample_linear_regression_data.txt from here.

Full code of reproduction

import org.apache.spark._

import org.apache.spark.ml.{Pipeline, PipelineModel} 
import org.apache.spark.ml.evaluation.{RegressionEvaluator, BinaryClassificationEvaluator}
import org.apache.spark.ml.feature.VectorAssembler 
import org.apache.spark.ml.regression.{LinearRegressionModel, LinearRegression}
import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGridBuilder} 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.ml.param.ParamMap

object Main {

    def main( args: Array[ String ] ): Unit = {

        val spark =
            SparkSession
            .builder()
            .appName( "SO" )
            .master( "local[*]" )
            .config( "spark.driver.host", "localhost" )
            .getOrCreate()

        import spark.implicits._

        val data = 
            spark
            .read
            .format( "libsvm" )
            .load( "./sample_linear_regression_data.txt" )

        val Array( training, test ) = 
            data
            .randomSplit( Array( 0.9, 0.1 ), seed = 12345 )

        val maxIter = 10;

        val lr = new LinearRegression()
            .setSolver( "normal" )
            .setMaxIter( maxIter )

        val paramGrid = new ParamGridBuilder()
            // .addGrid( lr.elasticNetParam, Array( 0.2, 0.4, 0.8, 0.9 ) )
            .addGrid( lr.elasticNetParam, Array( 0.0 ) )
            .addGrid( lr.regParam, Array( 0.6, 0.3, 0.1, 0.01 ) )
            .build()

        val pipeline = new Pipeline()
            .setStages( Array( lr ) )

        val cv = new CrossValidator()
            .setEstimator( pipeline )
            .setEvaluator( new RegressionEvaluator )
            .setEstimatorParamMaps( paramGrid )
            .setNumFolds( 2 )  // Use 3+ in practice

        val cvModel = 
            cv
            .fit( training )

        val pipelineModel: PipelineModel = 
            cvModel
            .bestModel
            .asInstanceOf[ PipelineModel ]

        val lrModel: LinearRegressionModel = 
            pipelineModel
            .stages( 0 )
            .asInstanceOf[ LinearRegressionModel ]

        // Technically there is a way to use exact ParamMap
        // to build a new LR but for the simplicity I'll 
        // get and set them explicitly

        // lrModel.params.foreach( ( param ) => {

        //     println( param )
        // } )

        // val bestLr = new LinearRegression()
        //     .setSolver( "normal" )
        //     .setMaxIter( maxIter )
        //     .setRegParam( lrModel.getRegParam )
        //     .setElasticNetParam( lrModel.getElasticNetParam )

        // val bestLrModel = bestLr.fit( training )

        val modelSummary = 
            lrModel
            .summary

        println( "lrModel pValues: " + modelSummary.pValues.mkString( ", " ) )

        spark.stop()
    }
}

Original

There are three solver algorithms available:

  • l-bfgs - Limited-memory Broyden–Fletcher–Goldfarb–Shanno algorithm which is a limited-memory quasi-Newton optimization method.
  • normal - using Normal Equation as an analytical solution to the linear regression problem. It is basically a weighted least squares approach or reweighted least squares approach.
  • auto - solver algorithm is selected automatically. The Normal Equations solver will be used when possible, but this will automatically fall back to iterative optimization methods when needed

The coefficientStandardErrors, tValues and pValues are only available when using the "normal" solver because they are all based on diagInvAtWA - a diagonal of matrix (A^T * W * A)^-1.

like image 168
Dennis Tsoi Avatar answered Oct 19 '22 20:10

Dennis Tsoi