Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Polynomial regression in spark/ or external packages for spark

After investing good amount of searching on net for this topic, I am ending up here if I can get some pointer . please read further

After analyzing Spark 2.0 I concluded polynomial regression is not possible with spark (spark alone), so is there some extension to spark which can be used for polynomial regression? - Rspark it could be done (but looking for better alternative) - RFormula in spark does prediction but coefficients are not available (which is my main requirement as I primarily interested in coefficient values)

like image 273
sourabh Avatar asked Aug 10 '16 13:08

sourabh


2 Answers

Polynomial regression is just another case of a linear regression (as in Polynomial regression is linear regression and Polynomial regression). As Spark has a method for linear regression, you can call that method changing the inputs in such a way that the new inputs are the ones suited to polynomial regression. For instance, if you only have one independent variable x, and you want to do quadratic regression, you have to change your independent input matrix for [x x^2].

like image 94
David Masip Avatar answered Nov 19 '22 18:11

David Masip


I would like to add some information to @Mehdi Lamrani’s answer :

If you want to do a polynomial linear regression in SparkML, you may use the class PolynomialExpansion. For information check the class in the SparkML Doc or in the Spark API Doc

Here is an implementation example:

Let's assume we have a train and test datasets, stocked in two csv files, with headers containing the neames of the columns (features, label). Each data set contains three features named f1,f2,f3, each of type Double (this is the X matrix), as well as a label feature (the Y vector) named mylabel.

For this code I used Spark+Scala: Scala version : 2.12.8 Spark version 2.4.0.

We assume that SparkML library was already downloaded in build.sbt.

First of all, import librairies :

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.udf
import org.apache.spark.{SparkConf, SparkContext}

Create Spark Session and Spark Context :

val ss = org.apache.spark.sql
  .SparkSession.builder()
  .master("local")
  .appName("Read CSV")
  .enableHiveSupport()
  .getOrCreate()

val conf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(conf)

Instantiate the variables you are going to use :

val f_train:String = "path/to/your/train_file.csv"
val f_test:String = "path/to/your/test_file.csv"
val degree:Int = 3 // Set the degree of your choice
val maxIter:Int = 10 // Set the max number of iterations
val lambda:Double = 0.0 // Set your lambda
val alpha:Double = 0.3 // Set the learning rate 

First of all, let's create first several udf-s, which will be used for the data reading and pre-processing. The arguments' types of the udf toFeatures will be Vector followed by the type of the arguments of the features: (Double,Double,Double)

val toFeatures = udf[Vector, Double, Double, Double] {
        (a,b,c) => Vectors.dense(a,b,c)
    }

val encodeIntToDouble    = udf[Double, Int](_.toDouble)

Now let's create a function which extracts data from CSV and creates, new features from the existing ones, using PolynomialExpansion:

def getDataPolynomial(
    currentfile:String, 
    sc:SparkSession, 
    sco:SparkContext, 
    degree:Int
    ):DataFrame = 
{
    val df_rough:DataFrame = sc.read
      .format("csv")
      .option("header", "true") //first line in file has headers
      .option("mode", "DROPMALFORMED")
      .option("inferSchema", value=true)
      .load(currentfile)
      .toDF("f1", "f2", "f3", "myLabel")
       // you may add or not the last line 

    val df:DataFrame = df_rough
        .withColumn("featNormTemp", toFeatures(df_rough("f1"), df_rough("f2"), df_rough("f3")))
        .withColumn("label", Tools.encodeIntToDouble(df_rough("myLabel")))

    val polyExpansion = new PolynomialExpansion()
      .setInputCol("featNormTemp")
      .setOutputCol("polyFeatures")
      .setDegree(degree)

    val polyDF:DataFrame=polyExpansion.transform(df.select("featNormTemp"))

    val datafixedWithFeatures:DataFrame = polyDF.withColumn("features", polyDF("polyFeatures"))

    val datafixedWithFeaturesLabel = datafixedWithFeatures
          .join(df,df("featNormTemp") === datafixedWithFeatures("featNormTemp"))
          .select("label", "polyFeatures")

    datafixedWithFeaturesLabel
}

Now, run the function both for the train and test datasets, using the chosen degree for the Polynomial expansion.

val X:DataFrame = getDataPolynomial(f_train,ss,sc,degree)
val X_test:DataFrame = getDataPolynomial(f_test,ss,sc,degree)

Run the algorithm in order to get a model of linear regression, using a pipeline :

val assembler = new VectorAssembler()
   .setInputCols(Array("polyFeatures"))
   .setOutputCol("features2")

val lr = new LinearRegression()
    .setMaxIter(maxIter)
    .setRegParam(lambda)
    .setElasticNetParam(alpha)
    .setFeaturesCol("features2")
    .setLabelCol("label")

// Fit the model:
val pipeline:Pipeline = new Pipeline().setStages(Array(assembler,lr))
val lrModel:PipelineModel = pipeline.fit(X)

// Get prediction on the test set :
val result:DataFrame = lrModel.transform(X_test)

Finally, evaluate the result using mean squared error measure :

def leastSquaresError(result:DataFrame):Double = {
    val rm:RegressionMetrics = new RegressionMetrics(
        result
            .select("label","prediction")
            .rdd
            .map(x => (x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))
    Math.sqrt(rm.meanSquaredError)
}

val error:Double = leastSquaresError(result)
println("Error : "+error)

I hope this might be useful !

like image 28
Catalina Chircu Avatar answered Nov 19 '22 20:11

Catalina Chircu