Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark.ml regressions do not calculate same models as scikit-learn

I am setting up a very simple logistic regression problem in scikit-learn and in spark.ml, and the results diverge: the models they learn are different, but I can't figure out why (data is the same, model type is the same, regularization is the same...).

No doubt I am missing some setting on one side or the other. Which setting? How should I set up either scikit or spark.ml to find the same model as its counterpart?

I give the sklearn code and spark.ml code below. Both should be ready to cut-and-paste and run.

scikit-learn code:

import numpy as np
from sklearn.linear_model import LogisticRegression, Ridge

X = np.array([
    [-0.7306653538519616, 0.0],
    [0.6750417712898752, -0.4232874171873786],
    [0.1863463229359709, -0.8163423997075965],
    [-0.6719842051493347, 0.0],
    [0.9699938346531928, 0.0],
    [0.22759406190283604, 0.0],
    [0.9688721028330911, 0.0],
    [0.5993795346650845, 0.0],
    [0.9219423508390701, -0.8972778242305388],
    [0.7006904841584055, -0.5607635619919824]
])

y = np.array([
    0.0,
    1.0,
    1.0,
    0.0,
    1.0,
    1.0,
    1.0,
    0.0,
    0.0,
    0.0
])

m, n = X.shape

# Add intercept term to simulate inputs to GameEstimator
X_with_intercept = np.hstack((X, np.ones(m)[:,np.newaxis]))

l = 0.3
e = LogisticRegression(
    fit_intercept=False,
    penalty='l2',
    C=1/l,
    max_iter=100,
    tol=1e-11)

e.fit(X_with_intercept, y)

print e.coef_
# => [[ 0.98662189  0.45571052 -0.23467255]]

# Linear regression is called Ridge in sklearn
e = Ridge(
    fit_intercept=False,
    alpha=l,
    max_iter=100,
    tol=1e-11)

e.fit(X_with_intercept, y)

print e.coef_
# =>[ 0.32155545  0.17904355  0.41222418]

spark.ml code:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql.SQLContext

object TestSparkRegression {
  def main(args: Array[String]): Unit = {
    import org.apache.log4j.{Level, Logger}

    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("akka").setLevel(Level.OFF)

    val conf = new SparkConf().setAppName("test").setMaster("local")
    val sc = new SparkContext(conf)

    val sparkTrainingData = new SQLContext(sc)
      .createDataFrame(Seq(
        LabeledPoint(0.0, Vectors.dense(-0.7306653538519616, 0.0)),
        LabeledPoint(1.0, Vectors.dense(0.6750417712898752, -0.4232874171873786)),
        LabeledPoint(1.0, Vectors.dense(0.1863463229359709, -0.8163423997075965)),
        LabeledPoint(0.0, Vectors.dense(-0.6719842051493347, 0.0)),
        LabeledPoint(1.0, Vectors.dense(0.9699938346531928, 0.0)),
        LabeledPoint(1.0, Vectors.dense(0.22759406190283604, 0.0)),
        LabeledPoint(1.0, Vectors.dense(0.9688721028330911, 0.0)),
        LabeledPoint(0.0, Vectors.dense(0.5993795346650845, 0.0)),
        LabeledPoint(0.0, Vectors.dense(0.9219423508390701, -0.8972778242305388)),
        LabeledPoint(0.0, Vectors.dense(0.7006904841584055, -0.5607635619919824))))
      .toDF("label", "features")

    val logisticModel = new LogisticRegression()
      .setRegParam(0.3)
      .setLabelCol("label")
      .setFeaturesCol("features")
      .fit(sparkTrainingData)

    println(s"Spark logistic model coefficients: ${logisticModel.coefficients} Intercept: ${logisticModel.intercept}")
    // Spark logistic model coefficients: [0.5451588538376263,0.26740606573584713] Intercept: -0.13897955358689987

    val linearModel = new LinearRegression()
      .setRegParam(0.3)
      .setLabelCol("label")
      .setFeaturesCol("features")
      .setSolver("l-bfgs")
      .fit(sparkTrainingData)

    println(s"Spark linear model coefficients: ${linearModel.coefficients} Intercept: ${linearModel.intercept}")
    // Spark linear model coefficients: [0.19852664861346023,0.11501200541407802] Intercept: 0.45464906876832323

    sc.stop()
  }
}
like image 731
Frank Avatar asked Mar 10 '17 23:03

Frank


People also ask

Can you use Scikit-learn with spark?

When used on a single machine, Spark can be used as a substitute to the default multithreading framework used by scikit-learn. If a need comes to spread the work across multiple machines, no change is required in the code between the single-machine case and the cluster case.

What is the difference between spark ml and spark MLlib?

Choosing Between Spark MLlib and Spark ML At first glance, the most obvious difference between MLlib and ML is the data types they work on, with MLlib supporting RDDs and ML supporting DataFrame s and Dataset s.

How is logistic regression in spark ml trained?

It is a special case of Generalized Linear models that predicts the probability of the outcomes. In spark.ml logistic regression can be used to predict a binary outcome by using binomial logistic regression, or it can be used to predict a multiclass outcome by using multinomial logistic regression.

Which one should be used to load linear regression Library in PySpark?

We use Boston Housing Price dataset of Scikit-learn. We'll load dataset, transform it into the data frame type, and combine into single features type by using VectorAssembler in order to make the appropriate input data format for LinearRegression class of PySpark ML library.


1 Answers

You need to do the following:

  1. Standardize both the python and spark dataframes first. Spark uses standardization by default internally. Take care to account for differences in standard deviation formulas in standardscaler implementations in both the packages.

  2. For logistic regression, Spark uses average of log-loss (denominator being sum of weights, which is number of training instances when all weights are 1) whereas sklearn uses sum of log-loss. In linear regression, spark uses a 1/2n factor in the sum of squared errors term unlike sklearn. Spark regularization needs to be scaled down accordingly - 1/10 times for logistic regression, and 1/20 times for linear regression in this example.

Scikit-learn code

import numpy as np
from sklearn.linear_model import LogisticRegression, Ridge

X = np.array([
    [-0.7306653538519616, 0.0],
    [0.6750417712898752, -0.4232874171873786],
    [0.1863463229359709, -0.8163423997075965],
    [-0.6719842051493347, 0.0],
    [0.9699938346531928, 0.0],
    [0.22759406190283604, 0.0],
    [0.9688721028330911, 0.0],
    [0.5993795346650845, 0.0],
    [0.9219423508390701, -0.8972778242305388],
    [0.7006904841584055, -0.5607635619919824]
])

y = np.array([
    0.0,
    1.0,
    1.0,
    0.0,
    1.0,
    1.0,
    1.0,
    0.0,
    0.0,
    0.0
])

m, n = X.shape


from sklearn.preprocessing import StandardScaler

## sqrt(n-1)/sqrt(n) factor for getting the same standardization as spark
Xsc=StandardScaler().fit_transform(X)*3.0/np.sqrt(10.0)

l = 0.3
e = LogisticRegression(
    fit_intercept=True,
    penalty='l2',
    C=1/l,
    max_iter=100,
    tol=1e-11,
    solver='lbfgs',verbose=1)

e.fit(Xsc, y)

print e.coef_, e.intercept_
# => [[ 0.82122437 0.32615256]] [-0.01181534]

#e.get_params(deep=True)

# Linear regression is called Ridge in sklearn
e = Ridge(
    fit_intercept=True,
    alpha=l,
    max_iter=100,
    tol=1e-11)

e.fit(Xsc, y)

print e.coef_,e.intercept_
# =>[ 0.21310109 0.09203616] 0.5

Spark Code (refactored to use ML API)

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SQLContext
import org.apache.spark.ml.feature.StandardScaler

val sparkTrainingData_orig = new SQLContext(sc).
  createDataFrame(Seq(
    (0.0, Vectors.dense(Array(-0.7306653538519616, 0.0))),
    (1.0, Vectors.dense(Array(0.6750417712898752, -0.4232874171873786))),
    (1.0, Vectors.dense(Array(0.1863463229359709, -0.8163423997075965))),
    (0.0, Vectors.dense(Array(-0.6719842051493347, 0.0))),
    (1.0, Vectors.dense(Array(0.9699938346531928, 0.0))),
    (1.0, Vectors.dense(Array(0.22759406190283604, 0.0))),
    (1.0, Vectors.dense(Array(0.9688721028330911, 0.0))),
    (0.0, Vectors.dense(Array(0.5993795346650845, 0.0))),
    (0.0, Vectors.dense(Array(0.9219423508390701, -0.8972778242305388))),
    (0.0, Vectors.dense(Array(0.7006904841584055, -0.5607635619919824))))).
  toDF("label", "features_orig")

val sparkTrainingData=new StandardScaler().
  setWithMean(true).
  setInputCol("features_orig").
  setOutputCol("features").
  fit(sparkTrainingData_orig).
  transform(sparkTrainingData_orig)

//Make regularization 0.3/10=0.03
val logisticModel = new LogisticRegression().
  setRegParam(0.03).
  setLabelCol("label").
  setFeaturesCol("features").
  setTol(1e-12).
  setMaxIter(100).
  fit(sparkTrainingData)

println(s"Spark logistic model coefficients: ${logisticModel.coefficients} Intercept: ${logisticModel.intercept}")
// Spark logistic model coefficients: [0.8212244419577079,0.32615245441495727] Intercept: -0.011815325216668142

//Make regularization 0.3/20=0.015    
val linearModel = new LinearRegression().
  setRegParam(0.015).
  setLabelCol("label").
  setFeaturesCol("features").
  setTol(1e-12).
  setMaxIter(100).
  fit(sparkTrainingData)

println(s"Spark linear model coefficients: ${linearModel.coefficients} Intercept: ${linearModel.intercept}")
// Spark linear model coefficients: [0.21394341729353747,0.09257340293212045] Intercept: 0.5
like image 76
Dhanesh Avatar answered Oct 25 '22 16:10

Dhanesh