I am setting up a very simple logistic regression problem in scikit-learn and in spark.ml, and the results diverge: the models they learn are different, but I can't figure out why (data is the same, model type is the same, regularization is the same...).
No doubt I am missing some setting on one side or the other. Which setting? How should I set up either scikit or spark.ml to find the same model as its counterpart?
I give the sklearn code and spark.ml code below. Both should be ready to cut-and-paste and run.
import numpy as np
from sklearn.linear_model import LogisticRegression, Ridge
X = np.array([
[-0.7306653538519616, 0.0],
[0.6750417712898752, -0.4232874171873786],
[0.1863463229359709, -0.8163423997075965],
[-0.6719842051493347, 0.0],
[0.9699938346531928, 0.0],
[0.22759406190283604, 0.0],
[0.9688721028330911, 0.0],
[0.5993795346650845, 0.0],
[0.9219423508390701, -0.8972778242305388],
[0.7006904841584055, -0.5607635619919824]
])
y = np.array([
0.0,
1.0,
1.0,
0.0,
1.0,
1.0,
1.0,
0.0,
0.0,
0.0
])
m, n = X.shape
# Add intercept term to simulate inputs to GameEstimator
X_with_intercept = np.hstack((X, np.ones(m)[:,np.newaxis]))
l = 0.3
e = LogisticRegression(
fit_intercept=False,
penalty='l2',
C=1/l,
max_iter=100,
tol=1e-11)
e.fit(X_with_intercept, y)
print e.coef_
# => [[ 0.98662189 0.45571052 -0.23467255]]
# Linear regression is called Ridge in sklearn
e = Ridge(
fit_intercept=False,
alpha=l,
max_iter=100,
tol=1e-11)
e.fit(X_with_intercept, y)
print e.coef_
# =>[ 0.32155545 0.17904355 0.41222418]
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql.SQLContext
object TestSparkRegression {
def main(args: Array[String]): Unit = {
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("test").setMaster("local")
val sc = new SparkContext(conf)
val sparkTrainingData = new SQLContext(sc)
.createDataFrame(Seq(
LabeledPoint(0.0, Vectors.dense(-0.7306653538519616, 0.0)),
LabeledPoint(1.0, Vectors.dense(0.6750417712898752, -0.4232874171873786)),
LabeledPoint(1.0, Vectors.dense(0.1863463229359709, -0.8163423997075965)),
LabeledPoint(0.0, Vectors.dense(-0.6719842051493347, 0.0)),
LabeledPoint(1.0, Vectors.dense(0.9699938346531928, 0.0)),
LabeledPoint(1.0, Vectors.dense(0.22759406190283604, 0.0)),
LabeledPoint(1.0, Vectors.dense(0.9688721028330911, 0.0)),
LabeledPoint(0.0, Vectors.dense(0.5993795346650845, 0.0)),
LabeledPoint(0.0, Vectors.dense(0.9219423508390701, -0.8972778242305388)),
LabeledPoint(0.0, Vectors.dense(0.7006904841584055, -0.5607635619919824))))
.toDF("label", "features")
val logisticModel = new LogisticRegression()
.setRegParam(0.3)
.setLabelCol("label")
.setFeaturesCol("features")
.fit(sparkTrainingData)
println(s"Spark logistic model coefficients: ${logisticModel.coefficients} Intercept: ${logisticModel.intercept}")
// Spark logistic model coefficients: [0.5451588538376263,0.26740606573584713] Intercept: -0.13897955358689987
val linearModel = new LinearRegression()
.setRegParam(0.3)
.setLabelCol("label")
.setFeaturesCol("features")
.setSolver("l-bfgs")
.fit(sparkTrainingData)
println(s"Spark linear model coefficients: ${linearModel.coefficients} Intercept: ${linearModel.intercept}")
// Spark linear model coefficients: [0.19852664861346023,0.11501200541407802] Intercept: 0.45464906876832323
sc.stop()
}
}
When used on a single machine, Spark can be used as a substitute to the default multithreading framework used by scikit-learn. If a need comes to spread the work across multiple machines, no change is required in the code between the single-machine case and the cluster case.
Choosing Between Spark MLlib and Spark ML At first glance, the most obvious difference between MLlib and ML is the data types they work on, with MLlib supporting RDDs and ML supporting DataFrame s and Dataset s.
It is a special case of Generalized Linear models that predicts the probability of the outcomes. In spark.ml logistic regression can be used to predict a binary outcome by using binomial logistic regression, or it can be used to predict a multiclass outcome by using multinomial logistic regression.
We use Boston Housing Price dataset of Scikit-learn. We'll load dataset, transform it into the data frame type, and combine into single features type by using VectorAssembler in order to make the appropriate input data format for LinearRegression class of PySpark ML library.
You need to do the following:
Standardize both the python and spark dataframes first. Spark uses standardization by default internally. Take care to account for differences in standard deviation formulas in standardscaler implementations in both the packages.
For logistic regression, Spark uses average of log-loss (denominator being sum of weights, which is number of training instances when all weights are 1) whereas sklearn uses sum of log-loss. In linear regression, spark uses a 1/2n factor in the sum of squared errors term unlike sklearn. Spark regularization needs to be scaled down accordingly - 1/10 times for logistic regression, and 1/20 times for linear regression in this example.
Scikit-learn code
import numpy as np
from sklearn.linear_model import LogisticRegression, Ridge
X = np.array([
[-0.7306653538519616, 0.0],
[0.6750417712898752, -0.4232874171873786],
[0.1863463229359709, -0.8163423997075965],
[-0.6719842051493347, 0.0],
[0.9699938346531928, 0.0],
[0.22759406190283604, 0.0],
[0.9688721028330911, 0.0],
[0.5993795346650845, 0.0],
[0.9219423508390701, -0.8972778242305388],
[0.7006904841584055, -0.5607635619919824]
])
y = np.array([
0.0,
1.0,
1.0,
0.0,
1.0,
1.0,
1.0,
0.0,
0.0,
0.0
])
m, n = X.shape
from sklearn.preprocessing import StandardScaler
## sqrt(n-1)/sqrt(n) factor for getting the same standardization as spark
Xsc=StandardScaler().fit_transform(X)*3.0/np.sqrt(10.0)
l = 0.3
e = LogisticRegression(
fit_intercept=True,
penalty='l2',
C=1/l,
max_iter=100,
tol=1e-11,
solver='lbfgs',verbose=1)
e.fit(Xsc, y)
print e.coef_, e.intercept_
# => [[ 0.82122437 0.32615256]] [-0.01181534]
#e.get_params(deep=True)
# Linear regression is called Ridge in sklearn
e = Ridge(
fit_intercept=True,
alpha=l,
max_iter=100,
tol=1e-11)
e.fit(Xsc, y)
print e.coef_,e.intercept_
# =>[ 0.21310109 0.09203616] 0.5
Spark Code (refactored to use ML API)
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SQLContext
import org.apache.spark.ml.feature.StandardScaler
val sparkTrainingData_orig = new SQLContext(sc).
createDataFrame(Seq(
(0.0, Vectors.dense(Array(-0.7306653538519616, 0.0))),
(1.0, Vectors.dense(Array(0.6750417712898752, -0.4232874171873786))),
(1.0, Vectors.dense(Array(0.1863463229359709, -0.8163423997075965))),
(0.0, Vectors.dense(Array(-0.6719842051493347, 0.0))),
(1.0, Vectors.dense(Array(0.9699938346531928, 0.0))),
(1.0, Vectors.dense(Array(0.22759406190283604, 0.0))),
(1.0, Vectors.dense(Array(0.9688721028330911, 0.0))),
(0.0, Vectors.dense(Array(0.5993795346650845, 0.0))),
(0.0, Vectors.dense(Array(0.9219423508390701, -0.8972778242305388))),
(0.0, Vectors.dense(Array(0.7006904841584055, -0.5607635619919824))))).
toDF("label", "features_orig")
val sparkTrainingData=new StandardScaler().
setWithMean(true).
setInputCol("features_orig").
setOutputCol("features").
fit(sparkTrainingData_orig).
transform(sparkTrainingData_orig)
//Make regularization 0.3/10=0.03
val logisticModel = new LogisticRegression().
setRegParam(0.03).
setLabelCol("label").
setFeaturesCol("features").
setTol(1e-12).
setMaxIter(100).
fit(sparkTrainingData)
println(s"Spark logistic model coefficients: ${logisticModel.coefficients} Intercept: ${logisticModel.intercept}")
// Spark logistic model coefficients: [0.8212244419577079,0.32615245441495727] Intercept: -0.011815325216668142
//Make regularization 0.3/20=0.015
val linearModel = new LinearRegression().
setRegParam(0.015).
setLabelCol("label").
setFeaturesCol("features").
setTol(1e-12).
setMaxIter(100).
fit(sparkTrainingData)
println(s"Spark linear model coefficients: ${linearModel.coefficients} Intercept: ${linearModel.intercept}")
// Spark linear model coefficients: [0.21394341729353747,0.09257340293212045] Intercept: 0.5
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With