I have a Python class that I'm using to load and process some data in Spark. Among various things I need to do, I'm generating a list of dummy variables derived from various columns in a Spark dataframe. My problem is that I'm not sure how to properly define a User Defined Function to accomplish what I need.
I do currently have a method that, when mapped over the underlying dataframe RDD, solves half the problem (remember that this is a method in a larger data_processor
class):
def build_feature_arr(self,table):
# this dict has keys for all the columns for which I need dummy coding
categories = {'gender':['1','2'], ..}
# there are actually two differnt dataframes that I need to do this for, this just specifies which I'm looking at, and grabs the relevant features from a config file
if table == 'users':
iter_over = self.config.dyadic_features_to_include
elif table == 'activty':
iter_over = self.config.user_features_to_include
def _build_feature_arr(row):
result = []
row = row.asDict()
for col in iter_over:
column_value = str(row[col]).lower()
cats = categories[col]
result += [1 if column_value and cat==column_value else 0 for cat in cats]
return result
return _build_feature_arr
Essentially what this does is, for the specified dataframe, takes the categorical variable values for the specified columns, and returns a list of the values of these new dummy variables. That means the following code:
data = data_processor(init_args)
result = data.user_data.rdd.map(self.build_feature_arr('users'))
returns something like:
In [39]: result.take(10)
Out[39]:
[[1, 0, 0, 0, 1, 0],
[1, 0, 0, 1, 0, 0],
[1, 0, 0, 0, 0, 0],
[1, 0, 1, 0, 0, 0],
[1, 0, 0, 1, 0, 0],
[1, 0, 0, 1, 0, 0],
[0, 1, 1, 0, 0, 0],
[1, 0, 1, 1, 0, 0],
[1, 0, 0, 1, 0, 0],
[1, 0, 0, 0, 0, 1]]
This is exactly what I want in terms of generating the list of dummy variables I want, but here's my question: How can I either (a) make a UDF with similar functionality that I can use in a Spark SQL query (or some other way, I suppose), or (b) take the RDD resulting from the map described above and add it as a new column to the user_data dataframe?
Either way, what I need to do is generate a new dataframe containing the columns from user_data, along with a new column (let's call it feature_array
) containing the output of the function above (or something functionally equivalent).
We can define functions on pyspark as we would on python but it would not be (directly) compatible with our spark dataframe. To do this, we need to define a UDF (User defined function) that will allow us to apply our function on a Spark Dataframe. The disadvantage is that UDFs can be quite long because they are applied line by line.
Here comes the PySpark, a python wrapper of spark which provides the functionality of spark in python with syntax very much similar to Pandas. In this blog, I will cover the steps of building a Machine Learning model using PySpark. For this project, we are using events data of a music streaming company named Sparkify provided by Udacity.
We now have two indexed and encoded features. Most machine learning algorithms in spark expect a single encoded numerical vector as the input. In order to do that, we use something that is called the vector assembler. Vector assembler’s job is to combine the raw features and features generated from various transforms into a single feature vector.
A UDF is just an anonymous function in PySpark very similar to lambda functions in python. you can find a few in the below image. For the average session length, I first did group by on user_id and session_id and kept the max session recorded by deleting duplicates we are left with one row per user with session length.
Spark >= 2.3, >= 3.0
Since Spark 2.3 OneHotEncoder
is deprecated in favor of OneHotEncoderEstimator
. If you use a recent release please modify encoder
code
from pyspark.ml.feature import OneHotEncoderEstimator
encoder = OneHotEncoderEstimator(
inputCols=["gender_numeric"],
outputCols=["gender_vector"]
)
In Spark 3.0 this variant has been renamed to OneHotEncoder
:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(
inputCols=["gender_numeric"],
outputCols=["gender_vector"]
)
Additionally StringIndexer
has been extended to support multiple input columns:
StringIndexer(inputCols=["gender"], outputCols=["gender_numeric"])
Spark < 2.3
Well, you can write an UDF but why would you? There are already quite a few tools designed to handle this category of tasks:
from pyspark.sql import Row
from pyspark.ml.linalg import DenseVector
row = Row("gender", "foo", "bar")
df = sc.parallelize([
row("0", 3.0, DenseVector([0, 2.1, 1.0])),
row("1", 1.0, DenseVector([0, 1.1, 1.0])),
row("1", -1.0, DenseVector([0, 3.4, 0.0])),
row("0", -3.0, DenseVector([0, 4.1, 0.0]))
]).toDF()
First of all StringIndexer
.
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="gender", outputCol="gender_numeric").fit(df)
indexed_df = indexer.transform(df)
indexed_df.drop("bar").show()
## +------+----+--------------+
## |gender| foo|gender_numeric|
## +------+----+--------------+
## | 0| 3.0| 0.0|
## | 1| 1.0| 1.0|
## | 1|-1.0| 1.0|
## | 0|-3.0| 0.0|
## +------+----+--------------+
Next OneHotEncoder
:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCol="gender_numeric", outputCol="gender_vector")
encoded_df = encoder.transform(indexed_df)
encoded_df.drop("bar").show()
## +------+----+--------------+-------------+
## |gender| foo|gender_numeric|gender_vector|
## +------+----+--------------+-------------+
## | 0| 3.0| 0.0|(1,[0],[1.0])|
## | 1| 1.0| 1.0| (1,[],[])|
## | 1|-1.0| 1.0| (1,[],[])|
## | 0|-3.0| 0.0|(1,[0],[1.0])|
## +------+----+--------------+-------------+
VectorAssembler
:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
inputCols=["gender_vector", "bar", "foo"], outputCol="features")
encoded_df_with_indexed_bar = (vector_indexer
.fit(encoded_df)
.transform(encoded_df))
final_df = assembler.transform(encoded_df)
If bar
contained categorical variables you could use VectorIndexer
to set required metadata:
from pyspark.ml.feature import VectorIndexer
vector_indexer = VectorIndexer(inputCol="bar", outputCol="bar_indexed")
but it is not the case here.
Finally you can wrap all of that using pipelines:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, encoder, vector_indexer, assembler])
model = pipeline.fit(df)
transformed = model.transform(df)
Arguably it is much robust and clean approach than writing everything from scratch. There are some caveats especially when you need consistent encoding between different datasets. You can read more in the official documentation for StringIndexer
and VectorIndexer
.
Another way to get a comparable output is RFormula
which:
RFormula
produces a vector column of features and a double or string column of label. Like when formulas are used in R for linear regression, string input columns will be one-hot encoded, and numeric columns will be cast to doubles. If the label column is of type string, it will be first transformed to double withStringIndexer
. If the label column does not exist in the DataFrame, the output label column will be created from the specified response variable in the formula.
from pyspark.ml.feature import RFormula
rf = RFormula(formula="~ gender + bar + foo - 1")
final_df_rf = rf.fit(df).transform(df)
As you can see it is much more concise, but harder to compose doesn't allow much customization. Nevertheless the result for a simple pipeline like this one will be identical:
final_df_rf.select("features").show(4, False)
## +----------------------+
## |features |
## +----------------------+
## |[1.0,0.0,2.1,1.0,3.0] |
## |[0.0,0.0,1.1,1.0,1.0] |
## |(5,[2,4],[3.4,-1.0]) |
## |[1.0,0.0,4.1,0.0,-3.0]|
## +----------------------+
final_df.select("features").show(4, False)
## +----------------------+
## |features |
## +----------------------+
## |[1.0,0.0,2.1,1.0,3.0] |
## |[0.0,0.0,1.1,1.0,1.0] |
## |(5,[2,4],[3.4,-1.0]) |
## |[1.0,0.0,4.1,0.0,-3.0]|
## +----------------------+
Regarding your questions:
make a UDF with similar functionality that I can use in a Spark SQL query (or some other way, I suppose)
It is just an UDF like any other. Make sure you use supported types and beyond that everything should work just fine.
take the RDD resulting from the map described above and add it as a new column to the user_data dataframe?
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.types import StructType, StructField
schema = StructType([StructField("features", VectorUDT(), True)])
row = Row("features")
result.map(lambda x: row(DenseVector(x))).toDF(schema)
Note:
For Spark 1.x replace pyspark.ml.linalg
with pyspark.mllib.linalg
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With