Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Convert ML VectorUDT features from .mllib to .ml type for linear regression

I am using Spark cluster and I would like to implement linear regression by executing this code :

data = sqlContext.read.format("com.databricks.spark.csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/FileStore/tables/w4s3yhez1497323663423/basma.csv/")

data.cache()  # Cache data for faster reuse
data.count()

from pyspark.mllib.regression import LabeledPoint

# convenience for specifying schema
data = data.select("creat0", "gfr0m").rdd.map(lambda r: LabeledPoint(r[1], [r[0]])) \
    .toDF()
display(data)

from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=["creat0", "gfr0m"], outputCol="features")

(trainingData, testData) = data.randomSplit([0.7, 0.3], seed=100)

trainingData.cache()
testData.cache()

print "Training Data : ", trainingData.count()
print "Test Data : ", testData.count()

data.collect()

from pyspark.ml.regression import LinearRegression

lr = LinearRegression()
# Fit 2 models, using different regularization parameters
modelA = lr.fit(data, {lr.regParam: 0.0})
modelB = lr.fit(data, {lr.regParam: 100.0})


# Make predictions
 predictionsA = modelA.transform(data)
  display(predictionsA)
   from pyspark.ml.evaluation import RegressionEvaluator
  evaluator = RegressionEvaluator(metricName="rmse")
  RMSE = evaluator.evaluate(predictionsA)
 print("ModelA: Root Mean Squared Error = " + str(RMSE))

# ModelA: Root Mean Squared Error = 128.602026843
 predictionsB = modelB.transform(data)
 RMSE = evaluator.evaluate(predictionsB)
  print("ModelB: Root Mean Squared Error = " + str(RMSE))

     # ModelB: Root Mean Squared Error = 129.496300193
  # Import numpy, pandas, and ggplot
  import numpy as np
 from pandas import *
   from ggplot import *


 But its give me this error:

IllegalArgumentException: u'requirement failed: Column features must be of type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 but was actually org.apache.spark.mllib.linalg.VectorUDT@f71b0bce.

After googling this error I found an answer that says:

use from pyspark.ml.linalg import Vectors, VectorUDT

instead of

from pyspark.mllib.linalg import Vectors, VectorUDT 

or

from pyspark.ml.linalg import VectorUDT
from pyspark.sql.functions import udf

and a function:

  as_ml = udf(lambda v: v.asML() if v is not None else None, VectorUDT())

With example data:

from pyspark.mllib.linalg import Vectors as MLLibVectors

df = sc.parallelize([
    (MLLibVectors.sparse(4, [0, 2], [1, -1]),),
    (MLLibVectors.dense([1, 2, 3, 4]),)
]).toDF(["features"])

result = df.withColumn("features", as_ml("features"))

But still I have the same error:

here some data :

cause ,"weight0","dbp0","gfr0m" 1,"90","10","22.72" 5,"54","10","16.08" 6,"66","9","25.47" 3,"110","11","32.95" 5,"62","11","20.3" 5,"65","8","28.94" 1,"65","8","15.88" 5,"96","8","38.09" 5,"110","8","41.64" 4,"68","8","25.85" 5,"68","7","37.77" 1,"82","9.5","16.25" 5,"76","10","37.55" 5,"56","","37.06" 1,"93","8","18.26" 5,"80","7.5","48.49" 1,"73","8","38.37" 4,"76","8","31.09" 1,"68","8","39.62" 1,"82","8","40.08" 1,"76","9.5","28.2" 5,"81","10","36.66" 2,"80","","47.1" 5,"91","10","16.59" 2,"58","8","49.22" 1,"76","7","38.98" ,"61","8","21.8" 5,"50","6","26.97" 1,"83","7","27.81" 1,"86","8","48.62" ,"77","6","46.78" 5,"64","6","34.17" 5,"58","6","38.95" 1,"73","6","7.63" 5,"86","8","32.46" 1,"50","6","35.98" 5,"90","7","32.26" 5,"42","7","17.3" 1,"88","7","25.61" 5,"110","","" 1,"84","6","31.4" 5,"68","8","53.25" 1,"96","8","52.65" 6,"74","8","40.77" 1,"70","9.5","22.35" 6,"54","8","20.16" 1,"52","13","32.61" ,"84","8","52.98" 5,"90","9","28.67"

like image 208
B.doc Avatar asked Oct 18 '22 11:10

B.doc


1 Answers

Here you go, you just need to alias the VectorUDT from pyspark.ml :

from pyspark.mllib.linalg import Vectors as MLLibVectors
from pyspark.ml.linalg import VectorUDT as VectorUDTML
from pyspark.sql.functions import udf

as_ml = udf(lambda v: v.asML() if v is not None else None, VectorUDTML())

df = sc.parallelize([
    (MLLibVectors.sparse(4, [0, 2], [1, -1]),),
    (MLLibVectors.dense([1, 2, 3, 4]),)
]).toDF(["features"])

result = df.withColumn("features", as_ml("features"))

result.show()
# +--------------------+
# |            features|
# +--------------------+
# |(4,[0,2],[1.0,-1.0])|
# |   [1.0,2.0,3.0,4.0]|
# +--------------------+

Of course the resulting DataFrame result isn't ready to be passed to LinearRegression as it doesn't have a label column but I trust you'd know how to deal with that.

like image 148
eliasah Avatar answered Oct 21 '22 05:10

eliasah