Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

one-hot encode of multiple string categorical features using Spark DataFrames

My goal is to one-hot encode a list of categorical columns using Spark DataFrames. For example, same like get_dummies() function does in Pandas.

The data set, bureau.csv originally have been taken from a Kaggle competition Home Credit Default Risk. Here is my entry table example, say entryData, where it is filtered where only KEY = 100001.

# primary key
KEY = 'SK_ID_CURR'
data = spark.read.csv("bureau.csv", header=True, inferSchema=True)
# sample data from bureau.csv of 1716428 rows
entryData = data.select(columnList).where(F.col(KEY) == 100001).show()
print(entryData)
+----------+-------------+---------------+---------------+
|SK_ID_CURR|CREDIT_ACTIVE|CREDIT_CURRENCY|    CREDIT_TYPE|
+----------+-------------+---------------+---------------+
|    100001|       Closed|     currency 1|Consumer credit|
|    100001|       Closed|     currency 1|Consumer credit|
|    100001|       Closed|     currency 1|Consumer credit|
|    100001|       Closed|     currency 1|Consumer credit|
|    100001|       Active|     currency 1|Consumer credit|
|    100001|       Active|     currency 1|Consumer credit|
|    100001|       Active|     currency 1|Consumer credit|
+----------+-------------+---------------+---------------+

I'm looking to one-hot encode the list columnList by creating the function catg_encode(entryData, columnList),

columnList = cols_type(entryData, obj=True)[1:]
print(columnList)
['CREDIT_ACTIVE', 'CREDIT_CURRENCY', 'CREDIT_TYPE']

Note cols_type() is a function that returns a list of columns either categorical columns (if obj=True) or numerical ones (if obj=False).

I have succeeded in one-hot encode the first column 'CREDIT_ACTIVE' but I couldn't for the hole columns simultaneously, I mean to build the function catg_encode.

# import necessary modules
from pyspark.sql import functions as F

# look for all distinct categoris within a given feature (here 'CREDIT_ACTIVE')
categories = entryData.select(columnList[0]).distinct().rdd.flatMap(lambda x: x).collect()
# one-hot encode the categories
exprs = [F.when(F.col(columnList[0]) == category, 1).otherwise(0).alias(category) for category in categories]
# nice table with encoded feature 'CREDIT_ACTIVE'
oneHotEncode = entryData.select(KEY, *exprs)
print(oneHotEncode)
+----------+--------+----+------+------+
|SK_ID_CURR|Bad debt|Sold|Active|Closed|
+----------+--------+----+------+------+
|    100001|       0|   0|     0|     1|
|    100001|       0|   0|     0|     1|
|    100001|       0|   0|     0|     1|
|    100001|       0|   0|     0|     1|
|    100001|       0|   0|     1|     0|
|    100001|       0|   0|     1|     0|
|    100001|       0|   0|     1|     0|
+----------+--------+----+------+------+

Here the feature 'CREDIT_ACTIVE' has 4 distinct categories; ['Bad debt', 'Sold', 'Active', 'Closed'].

Note I have even tried IndexToString and OneHotEncoderEstimator but didn't help for this specific task.

I am expecting to have the following output,

+----------+--------+----+------+------+----------+----------+----------+----------+----------+---
|SK_ID_CURR|Bad debt|Sold|Active|Closed|currency 1|currency 2|currency 3|currency 4|..........|...
+----------+--------+----+------+------+----------+----------+----------+----------+----------+---
|    100001|       0|   0|     0|     1|         1|         0|         0|         0|        ..|   
|    100001|       0|   0|     0|     1|         1|         0|         0|         0|        ..|
|    100001|       0|   0|     0|     1|         1|         0|         0|         0|        ..|
|    100001|       0|   0|     0|     1|         1|         0|         0|         0|        ..|
|    100001|       0|   0|     1|     0|         1|         0|         0|         0|        ..|
|    100001|       0|   0|     1|     0|         1|         0|         0|         0|        ..|
|    100001|       0|   0|     1|     0|         1|         0|         0|         0|        ..|
+----------+--------+----+------+------+----------+----------+----------+----------+----------+--- 

The continuous points ... are for the remaining categories of the feature 'CREDIT_TYPE' which are

['Loan for the purchase of equipment', 'Cash loan (non-earmarked)', 'Microloan', 'Consumer credit', 'Mobile operator loan', 'Another type of loan', 'Mortgage', 'Interbank credit', 'Loan for working capital replenishment', 'Car loan', 'Real estate loan', 'Unknown type of loan', 'Loan for business development', 'Credit card', 'Loan for purchase of shares (margin lending)'].

Remarque: I have seen this post E-num / get Dummies in pyspark but doesn't automate the process for many columns, case of big data. The post gives a solution to write separate code for each categorical feature which is not my case problem.

like image 408
Joe Avatar asked Nov 22 '19 13:11

Joe


1 Answers

There's two ways to juice this particular lemon. Let's have a look at them.

  1. Pivoting and joining
import pyspark.sql.functions as f

df1 = spark._sc.parallelize([
    [100001, 'Closed', 'currency 1', 'Consumer credit'],
    [100001, 'Closed', 'currency 1', 'Consumer credit'],
    [100001, 'Closed', 'currency 1', 'Consumer credit'],
    [100001, 'Closed', 'currency 1', 'Consumer credit'],
    [100001, 'Active', 'currency 1', 'Consumer credit'],
    [100001, 'Active', 'currency 1', 'Consumer credit'],
    [100001, 'Active', 'currency 1', 'Consumer credit'],
    [100002, 'Active', 'currency 2', 'Consumer credit'],
]).toDF(['SK_ID_CURR', 'CREDIT_ACTIVE', 'CREDIT_CURRENCY', 'CREDIT_TYPE'])

# this can be done dynamically, but I don't have all categories
categories = ['Active', 'Closed', 'Bad debt', 'Sold']

# we need to pivot without aggregation, so I need to add an `id` column and group by it as well
credit_groups = (
  df1.withColumn('id', f.monotonically_increasing_id())
     .groupBy('SK_ID_CURR', 'id')
     .pivot('CREDIT_ACTIVE', values=categories)
     .agg(f.lit(1))
     .drop('id')
)

# currency groups are just a 1 for each currency and ID, as per the example data
# if this is not the case, something more clever needs to be here 
currency_groups = df1.groupBy('SK_ID_CURR').pivot('CREDIT_CURRENCY').agg(f.lit(1))

# join the two pivoted tables on the ID and fill nulls to zeroes
credit_groups.join(currency_groups, on=['SK_ID_CURR'], how='inner').na.fill(0).show()

+----------+------+------+--------+----+----------+----------+
|SK_ID_CURR|Active|Closed|Bad debt|Sold|currency 1|currency 2|
+----------+------+------+--------+----+----------+----------+
|    100002|     1|     0|       0|   0|         0|         1|
|    100001|     0|     1|       0|   0|         1|         0|
|    100001|     1|     0|       0|   0|         1|         0|
|    100001|     1|     0|       0|   0|         1|         0|
|    100001|     0|     1|       0|   0|         1|         0|
|    100001|     0|     1|       0|   0|         1|         0|
|    100001|     1|     0|       0|   0|         1|         0|
|    100001|     0|     1|       0|   0|         1|         0|
+----------+------+------+--------+----+----------+----------+
  1. Using StringIndexer and OneHotEncoderEstimator such as following:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer

indexers = [StringIndexer(inputCol=column, outputCol=column+"_NUMERIC").fit(df1) for column in ['CREDIT_ACTIVE', 'CREDIT_CURRENCY']]

pipeline = Pipeline(stages=indexers)
df_indexed = pipeline.fit(df1).transform(df1)
df_indexed.show()

+----------+-------------+---------------+---------------+---------------------+-----------------------+
|SK_ID_CURR|CREDIT_ACTIVE|CREDIT_CURRENCY|    CREDIT_TYPE|CREDIT_ACTIVE_NUMERIC|CREDIT_CURRENCY_NUMERIC|
+----------+-------------+---------------+---------------+---------------------+-----------------------+
|    100001|       Closed|     currency 1|Consumer credit|                  0.0|                    0.0|
|    100001|       Closed|     currency 1|Consumer credit|                  0.0|                    0.0|
|    100001|       Closed|     currency 1|Consumer credit|                  0.0|                    0.0|
|    100001|       Closed|     currency 1|Consumer credit|                  0.0|                    0.0|
|    100001|       Active|     currency 1|Consumer credit|                  1.0|                    0.0|
|    100001|       Active|     currency 1|Consumer credit|                  1.0|                    0.0|
|    100001|       Active|     currency 1|Consumer credit|                  1.0|                    0.0|
|    100002|       Active|     currency 2|Consumer credit|                  1.0|                    1.0|
+----------+-------------+---------------+---------------+---------------------+-----------------------+

And from here on out, you use one-hot encoding on your newly created numerical columns. I personally recommend route 1, as it is more readable. Route 2, however, allows you to chain OneHotEncoderEstimator into the declared Pipeline as well, making the code executable from one line after declaration. Hope this helps.

like image 120
napoleon_borntoparty Avatar answered Oct 13 '22 16:10

napoleon_borntoparty