Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Grouped linear regression in Spark

I'm working in PySpark, and I'd like to find a way to perform linear regressions on groups of data. Specifically given this dataframe

import pandas as pd
pdf = pd.DataFrame({'group_id':[1,1,1,2,2,2,3,3,3,3],
                    'x':[0,1,2,0,1,5,2,3,4,5],
                    'y':[2,1,0,0,0.5,2.5,3,4,5,6]})
df = sqlContext.createDataFrame(pdf)

df.show()
# +--------+-+---+
# |group_id|x|  y|
# +--------+-+---+
# |       1|0|2.0|
# |       1|1|1.0|
# |       1|2|0.0|
# |       2|0|0.0|
# |       2|1|0.5|
# |       2|5|2.5|
# |       3|2|3.0|
# |       3|3|4.0|
# |       3|4|5.0|
# |       3|5|6.0|
# +--------+-+---+

I'd now like to be able to fit a separate y ~ ax + b model for each group_id and output a new dataframe with columns a and b and a row for each group.

For instance for group 1 I could do:

from sklearn import linear_model
# Regression on group_id = 1
data = df.where(df.group_id == 1).toPandas()
regr = linear_model.LinearRegression()
regr.fit(data.x.values.reshape(len(data),1), data.y.reshape(len(data),1))
a = regr.coef_[0][0]
b = regr.intercept_[0]
print('For group 1, y = {0}*x + {1}'.format(a, b))
# Repeat for group_id=2, group_id=3

But to do this for each group involves bringing the data back to the driver one be one, which doesn't take advantage of any Spark parallelism.

like image 644
tobycoleman Avatar asked Nov 30 '15 12:11

tobycoleman


People also ask

How do I find the linear regression line in spark?

The easiest way to find that line in Apache Spark is to use: org.apache.spark.mllib.regression.LinearRegressionMode. But a more sophisticated approach is to use: where means Stochastic Gradient Descent. For reasons beyond the scope of this document, suffice it to say that SGD is better suited to certain analytics problems than others.

What is the use of logistic regression model in spark?

The built Logistic Regression model can be persisted in to disk. A persisted model can be reload and use use later on a different spark application. Finally the Logistic Regression model can use to detect the binary classifications of new data.

What is the use of groupby () in spark?

Similar to SQL “GROUP BY” clause, Spark groupBy () function is used to collect the identical data into groups on DataFrame/Dataset and perform aggregate functions on the grouped data. In this article, I will explain several groupBy () examples with the Scala language.

What is a persisted model in spark?

A persisted model can be reload and use use later on a different spark application. Finally the Logistic Regression model can use to detect the binary classifications of new data. Following example shows detecting the pass/fail status (classification) of the new students by using two past exam scores.


1 Answers

Here's a solution I found. Instead of performing separate regressions on each group of data, create one sparse matrix with separate columns for each group:

from pyspark.mllib.regression import LabeledPoint, SparseVector

# Label points for regression
def groupid_to_feature(group_id, x, num_groups):
    intercept_id = num_groups + group_id-1
    # Need a vector containing x and a '1' for the intercept term
    return SparseVector(num_groups*2, {group_id-1: x, intercept_id: 1.0})

labelled = df.map(lambda line:LabeledPoint(line[2],
                groupid_to_feature(line[0], line[1], 3)))

labelled.take(5)
# [LabeledPoint(2.0, (6,[0,3],[0.0,1.0])),
#  LabeledPoint(1.0, (6,[0,3],[1.0,1.0])),
#  LabeledPoint(0.0, (6,[0,3],[2.0,1.0])),
#  LabeledPoint(0.0, (6,[1,4],[0.0,1.0])),
#  LabeledPoint(0.5, (6,[1,4],[1.0,1.0]))]

Then use Spark's LinearRegressionWithSGD to run the regression:

from pyspark.mllib.regression import LinearRegressionModel, LinearRegressionWithSGD
lrm = LinearRegressionWithSGD.train(labelled, iterations=5000, intercept=False)

The weights from this regression contain the coefficient and intercept for each group_id, i.e.

lrm.weights
# DenseVector([-1.0, 0.5, 1.0014, 2.0, 0.0, 0.9946])

or reshaped into a DataFrame to give a and b for each group:

pd.DataFrame(lrm.weights.reshape(2,3).transpose(), columns=['a','b'], index=[1,2,3])    
#           a              b
# 1 -0.999990   1.999986e+00
# 2  0.500000   5.270592e-11
# 3  1.001398   9.946426e-01
like image 76
tobycoleman Avatar answered Sep 22 '22 08:09

tobycoleman