I'm wondering if there is a concise way to run ML (e.g KMeans) on a DataFrame in pyspark if I have the features in multiple numeric columns.
I.e. as in the Iris
dataset:
(a1=5.1, a2=3.5, a3=1.4, a4=0.2, id=u'id_1', label=u'Iris-setosa', binomial_label=1)
I'd like to use KMeans without recreating the DataSet with the feature vector added manually as a new column and the original columns hardcoded repeatedly in the code.
The solution I'd like to improve:
from pyspark.mllib.linalg import Vectors from pyspark.sql.types import Row from pyspark.ml.clustering import KMeans, KMeansModel iris = sqlContext.read.parquet("/opt/data/iris.parquet") iris.first() # Row(a1=5.1, a2=3.5, a3=1.4, a4=0.2, id=u'id_1', label=u'Iris-setosa', binomial_label=1) df = iris.map(lambda r: Row( id = r.id, a1 = r.a1, a2 = r.a2, a3 = r.a3, a4 = r.a4, label = r.label, binomial_label=r.binomial_label, features = Vectors.dense(r.a1, r.a2, r.a3, r.a4)) ).toDF() kmeans_estimator = KMeans()\ .setFeaturesCol("features")\ .setPredictionCol("prediction")\ kmeans_transformer = kmeans_estimator.fit(df) predicted_df = kmeans_transformer.transform(df).drop("features") predicted_df.first() # Row(a1=5.1, a2=3.5, a3=1.4, a4=0.2, binomial_label=1, id=u'id_1', label=u'Iris-setosa', prediction=1)
I'm looking for a solution, which is something like:
feature_cols = ["a1", "a2", "a3", "a4"] prediction_col_name = "prediction" <dataframe independent code for KMeans> <New dataframe is created, extended with the `prediction` column.>
A label indexer that maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values. The indices are in [0, numLabels). By default, this is ordered by label frequencies so the most frequent label gets index 0.
Class OneHotEncoderEstimatorA one-hot encoder that maps a column of category indices to a column of binary vectors, with at most a single one-value per row that indicates the input category index.
You can use VectorAssembler
:
from pyspark.ml.feature import VectorAssembler ignore = ['id', 'label', 'binomial_label'] assembler = VectorAssembler( inputCols=[x for x in df.columns if x not in ignore], outputCol='features') assembler.transform(df)
It can be combined with k-means using ML Pipeline:
from pyspark.ml import Pipeline pipeline = Pipeline(stages=[assembler, kmeans_estimator]) model = pipeline.fit(df)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With