Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Random Forest Regression for categorical inputs on PySpark

I have been trying to do a simple random forest regression model on PySpark. I have a decent experience of Machine Learning on R. However, to me, ML on Pyspark seems completely different - especially when it comes to the handling of categorical variables, string indexing, and OneHotEncoding (When there are only numeric variables, I was able to perform RF regression just by following examples). While there are a lot of examples available for handling categorical variables, such as this and this, I have had no success with any of them as most of them went over my head (probably because of my unfamiliarity with Python ML). I will be grateful to anyone who can help fix this.

Here is my attempt: inputfile is here

from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import Row
from pyspark.sql.functions import col, round
train = sqlContext.read.format('com.databricks.spark.csv').options(header='true',inferschema = "true").load('filename.csv')
train.cache()
train.dtypes

The output is:

DataFrame[ID: int, Country: string, Carrier: double, TrafficType: string, ClickDate: timestamp, Device: string, Browser: string, OS: string, RefererUrl: string, UserIp: string, ConversionStatus: string, ConversionDate: string, ConversionPayOut: string, publisherId: string, subPublisherId: string, advertiserCampaignId: double, Fraud: double]

Next I choose my variables of interest:

IMP = ["Country","Carrier","TrafficType","Device","Browser","OS","Fraud","ConversionPayOut"]
train = train.fillna("XXX")
train = train.select([column for column in train.columns if column in IMP])
from pyspark.sql.types import DoubleType
train = train.withColumn("ConversionPayOut", train["ConversionPayOut"].cast("double"))
train.cache()

Output is:

DataFrame[Country: string, Carrier: double, TrafficType: string, Device: string, Browser: string, OS: string, ConversionPayOut: double, Fraud: double]

My dependent variable is ConversionPayOut, previously a string type is now converted to a double-type.

From here starts my confusion: Based on this post, I understood I have to convert my categorical stringtype variables to onehot encoded vectors. Here is my attempt at that:

First a StringIndexing:

`

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(junk) for column in list(set(junk.columns)-set(['Carrier','ConversionPayOut','Fraud'])) ]
pipeline = Pipeline(stages=indexers)
train_catind = pipeline.fit(train).transform(train)
train_catind.show()

`

Output of StringIndexing:

`

+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+
|Country|Carrier|TrafficType| Device|       Browser|     OS|  ConversionPayOut|Fraud|TrafficType_index|Country_index|Browser_index|OS_index|Device_index|
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+
|     TH|   20.0|          A|   Lava|        chrome|Android|              41.6|  0.0|              0.0|          1.0|          0.0|     0.0|         7.0|
|     BR|  217.0|          A|     LG|        chrome|Android|        26.2680574|  0.0|              0.0|          2.0|          0.0|     0.0|         5.0|
|     TH|   20.0|          A|Generic|        chrome|Android|              41.6|  0.0|              0.0|          1.0|          0.0|     0.0|         0.0|`


Next, I think, I have to do the OneHOtEncoding of the String Indexes:

`

from pyspark.ml.feature import OneHotEncoder, StringIndexer
indexers_ON = [OneHotEncoder(inputCol=column, outputCol=column+"_Vec") for column in filter(lambda x: x.endswith('_index'), train_catind.columns) ]
pipeline = Pipeline(stages=indexers_ON)
train_OHE = pipeline.fit(train_catind).transform(train_catind)
train_OHE.show()

`

Out after one-hot encoding looks like this:

`

+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+---------------------+-----------------+-----------------+-------------+----------------+
|Country|Carrier|TrafficType| Device|       Browser|     OS|  ConversionPayOut|Fraud|TrafficType_index|Country_index|Browser_index|OS_index|Device_index|TrafficType_index_Vec|Country_index_Vec|Browser_index_Vec| OS_index_Vec|Device_index_Vec|
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+---------------------+-----------------+-----------------+-------------+----------------+
|     TH|   20.0|          A|   Lava|        chrome|Android|              41.6|  0.0|              0.0|          1.0|          0.0|     0.0|         7.0|        (1,[0],[1.0])|    (9,[1],[1.0])|    (5,[0],[1.0])|(1,[0],[1.0])|  (15,[7],[1.0])|
|     BR|  217.0|          A|     LG|        chrome|Android|        26.2680574|  0.0|              0.0|          2.0|          0.0|     0.0|         5.0|        (1,[0],[1.0])|    (9,[2],[1.0])|    (5,[0],[1.0])|(1,[0],[1.0])|  (15,[5],[1.0])|
|     TH|   20.0|          A|Generic|        chrome|Android|              41.6|  0.0|              0.0|          1.0|          0.0|     0.0|         0.0|        (1,[0],[1.0])|    (9,[1],[1.0])|    (5,[0],[1.0])|(1,[0],[1.0])|  (15,[0],[1.0])|

`

I am clueless as to how to proceed forward. In fact, I am clueless about which Spark Machine Learning packages require us to do this one-hot encoding and which ones do not.

It would be really great learning for all the newbies of PySpark if the StackOverflow community could clarify on how to go forward.

like image 517
honeybadger Avatar asked Nov 08 '22 16:11

honeybadger


1 Answers

To run Random Forest on your pre-processed data you can proceed with below code.

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

#use VectorAssembler to combine all the feature columns into a single vector column
assemblerInputs = ["Carrier","Fraud","Country_index_Vec","TrafficType_index_Vec","Device_index_Vec","Browser_index_Vec","OS_index_Vec"]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
pipeline = Pipeline(stages=assembler)
df = pipeline.fit(train_OHE).transform(train_OHE)
df = df.withColumn("label", train_OHE.ConversionPayOut)

#randomly split data into training and test dataset
(train_data, test_data) = df.randomSplit([0.7, 0.3], seed = 111)

# train RandomForest model
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
rf_model = rf.fit(train_data)

# Make predictions on test data
predictions = rf_model.transform(test_data)


Hope this helps!

like image 161
1.618 Avatar answered Nov 15 '22 07:11

1.618