I'm working in PySpark, and I'd like to find a way to perform linear regressions on groups of data. Specifically given this dataframe
import pandas as pd
pdf = pd.DataFrame({'group_id':[1,1,1,2,2,2,3,3,3,3],
'x':[0,1,2,0,1,5,2,3,4,5],
'y':[2,1,0,0,0.5,2.5,3,4,5,6]})
df = sqlContext.createDataFrame(pdf)
df.show()
# +--------+-+---+
# |group_id|x| y|
# +--------+-+---+
# | 1|0|2.0|
# | 1|1|1.0|
# | 1|2|0.0|
# | 2|0|0.0|
# | 2|1|0.5|
# | 2|5|2.5|
# | 3|2|3.0|
# | 3|3|4.0|
# | 3|4|5.0|
# | 3|5|6.0|
# +--------+-+---+
I'd now like to be able to fit a separate y ~ ax + b
model for each group_id
and output a new dataframe with columns a
and b
and a row for each group.
For instance for group 1
I could do:
from sklearn import linear_model
# Regression on group_id = 1
data = df.where(df.group_id == 1).toPandas()
regr = linear_model.LinearRegression()
regr.fit(data.x.values.reshape(len(data),1), data.y.reshape(len(data),1))
a = regr.coef_[0][0]
b = regr.intercept_[0]
print('For group 1, y = {0}*x + {1}'.format(a, b))
# Repeat for group_id=2, group_id=3
But to do this for each group involves bringing the data back to the driver one be one, which doesn't take advantage of any Spark parallelism.
The easiest way to find that line in Apache Spark is to use: org.apache.spark.mllib.regression.LinearRegressionMode. But a more sophisticated approach is to use: where means Stochastic Gradient Descent. For reasons beyond the scope of this document, suffice it to say that SGD is better suited to certain analytics problems than others.
The built Logistic Regression model can be persisted in to disk. A persisted model can be reload and use use later on a different spark application. Finally the Logistic Regression model can use to detect the binary classifications of new data.
Similar to SQL “GROUP BY” clause, Spark groupBy () function is used to collect the identical data into groups on DataFrame/Dataset and perform aggregate functions on the grouped data. In this article, I will explain several groupBy () examples with the Scala language.
A persisted model can be reload and use use later on a different spark application. Finally the Logistic Regression model can use to detect the binary classifications of new data. Following example shows detecting the pass/fail status (classification) of the new students by using two past exam scores.
Here's a solution I found. Instead of performing separate regressions on each group of data, create one sparse matrix with separate columns for each group:
from pyspark.mllib.regression import LabeledPoint, SparseVector
# Label points for regression
def groupid_to_feature(group_id, x, num_groups):
intercept_id = num_groups + group_id-1
# Need a vector containing x and a '1' for the intercept term
return SparseVector(num_groups*2, {group_id-1: x, intercept_id: 1.0})
labelled = df.map(lambda line:LabeledPoint(line[2],
groupid_to_feature(line[0], line[1], 3)))
labelled.take(5)
# [LabeledPoint(2.0, (6,[0,3],[0.0,1.0])),
# LabeledPoint(1.0, (6,[0,3],[1.0,1.0])),
# LabeledPoint(0.0, (6,[0,3],[2.0,1.0])),
# LabeledPoint(0.0, (6,[1,4],[0.0,1.0])),
# LabeledPoint(0.5, (6,[1,4],[1.0,1.0]))]
Then use Spark's LinearRegressionWithSGD
to run the regression:
from pyspark.mllib.regression import LinearRegressionModel, LinearRegressionWithSGD
lrm = LinearRegressionWithSGD.train(labelled, iterations=5000, intercept=False)
The weights from this regression contain the coefficient and intercept for each group_id
, i.e.
lrm.weights
# DenseVector([-1.0, 0.5, 1.0014, 2.0, 0.0, 0.9946])
or reshaped into a DataFrame to give a
and b
for each group:
pd.DataFrame(lrm.weights.reshape(2,3).transpose(), columns=['a','b'], index=[1,2,3])
# a b
# 1 -0.999990 1.999986e+00
# 2 0.500000 5.270592e-11
# 3 1.001398 9.946426e-01
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With