Background: I'm doing a simple binary classification, using RandomForestClassifier from pyspark.ml. Before feeding the data to training, I managed to use VectorIndexer to decide whether features would be numerical or categorical by providing the argument maxCategories.
Problem: Even if I have used the VectorIndexer with maxCategories setting to 30, I was still getting an error during training pipeline:
An error occurred while calling o15371.fit.
: java.lang.IllegalArgumentException: requirement failed: DecisionTree requires maxBins (= 32) to be at least as large as the number of values in each categorical feature, but categorical feature 0 has 10765 values. Considering remove this and other categorical features with a large number of values, or add more training examples.
My code is simple, col_idx is a column string list I generated which will be passed to stringindexer, col_all is a column string list which will be passed to stringindexer and onehotencoder, col_num are numeric column names.
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler, IndexToString, VectorIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
my_data.cache()
# stringindexers and encoders
stIndexers = [StringIndexer(inputCol = Col, outputCol = Col + 'Index').setHandleInvalid('keep') for Col in col_idx]
encoder = OneHotEncoderEstimator(inputCols = [Col + 'Index' for Col in col_all], outputCols = [Col + 'ClassVec' for Col in col_all]).setHandleInvalid('keep')
# vector assemblor
col_into_assembler = [cols + 'Index' for cols in col_idx] + [cols + 'ClassVec' for cols in col_all] + col_num
assembler = VectorAssembler(inputCols = col_into_assembler, outputCol = "features")
# featureIndexer, labelIndexer, rf classifier and labelConverter
featureIndexer = VectorIndexer(inputCol = "features", outputCol = "indexedFeatures", maxCategories = 30)
# columns smaller than maxCategories => categorical features, columns larger than maxCategories => numerical / continuous features, smaller value => less categorical features, larger value => more categorical features.
labelIndexer = StringIndexer(inputCol = "label", outputCol = "indexedLabel").fit(my_data)
rf = RandomForestClassifier(featuresCol = "indexedFeatures", labelCol = "indexedLabel")
labelConverter = IndexToString(inputCol = "prediction", outputCol = "predictedLabel", labels=labelIndexer.labels)
# chain all the estimators and transformers stages into a Pipeline estimator
rfPipeline = Pipeline(stages = stIndexers + [encoder, assembler, featureIndexer, labelIndexer, rf, labelConverter])
# split data, cache them
training, test = my_data.randomSplit([0.7, 0.3], seed = 100)
training.cache()
test.cache()
# fit the estimator with training dataset to get a compiled pipeline with transformers and fitted models.
ModelRF = rfPipeline.fit(training)
# make predictions
predictions = ModelRF.transform(test)
predictions.printSchema()
predictions.show(5)
So my question is: how come there's still a high levels categorical feature in my data even if I have set maxCategories to 30 in VectorIndexer. I can set maxBins in rf classifier to higher value but I'm just curious: why the VectorIndexer is not working as expected (well, as I expected): casting feature smaller than maxCategories to categorical feature, larger to numerical features.
This helps process a dataset of unknown vectors into a dataset with some continuous features and some categorical features. The choice between continuous and categorical is based upon a maxCategories parameter. Set maxCategories to the maximum number of categorical any categorical feature should have.
A label indexer that maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values. The indices are in [0, numLabels). By default, this is ordered by label frequencies so the most frequent label gets index 0.
A Pipeline consists of a sequence of stages, each of which is either an Estimator or a Transformer .
It looks like, that contrary to the documentation, which lists:
Preserve metadata in transform; if a feature's metadata is already present, do not recompute.
among TODO, metadata is already preserved.
from pyspark.sql.functions import col
from pyspark.ml import Pipeline
from pyspark.ml.feature import *
df = spark.range(10)
stages = [StringIndexer(inputCol="id", outputCol="idx"), VectorAssembler(inputCols=["idx"], outputCol="features"), VectorIndexer(inputCol="features", outputCol="features_indexed", maxCategories=5)]
Pipeline(stages=stages).fit(df).transform(df).schema["features"].metadata
# {'ml_attr': {'attrs': {'nominal': [{'vals': ['8',
# '4',
# '9',
# '5',
# '6',
# '1',
# '0',
# '2',
# '7',
# '3'],
# 'idx': 0,
# 'name': 'idx'}]},
# 'num_attrs': 1}}
Pipeline(stages=stages).fit(df).transform(df).schema["features_indexed"].metadata
# {'ml_attr': {'attrs': {'nominal': [{'ord': False,
# 'vals': ['0.0',
# '1.0',
# '2.0',
# '3.0',
# '4.0',
# '5.0',
# '6.0',
# '7.0',
# '8.0',
# '9.0'],
# 'idx': 0,
# 'name': 'idx'}]},
# 'num_attrs': 1}}
Under normal circumstances it is a desired behavior. You shouldn't use indexed categorical features as continuous variables
But if still want to circumvent this behavior, you'll have to reset metadata, for example:
pipeline1 = Pipeline(stages=stages[:1])
pipeline2 = Pipeline(stages=stages[1:])
dft1 = pipeline1.fit(df).transform(df).withColumn("idx", col("idx").alias("idx", metadata={}))
dft2 = pipeline2.fit(dft1).transform(dft1)
dft2.schema["features_indexed"].metadata
# {'ml_attr': {'attrs': {'numeric': [{'idx': 0, 'name': 'idx'}]},
# 'num_attrs': 1}}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With