Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

KMeans clustering in PySpark

I have a spark dataframe 'mydataframe' with many columns. I am trying to run kmeans on only two columns: lat and long (latitude & longitude) using them as simple values). I want to extract 7 clusters based on just those 2 columns and then I want to attach the cluster asignment to my original dataframe. I've tried:

from numpy import array
from math import sqrt
from pyspark.mllib.clustering import KMeans, KMeansModel

# Prepare a data frame with just 2 columns:
data = mydataframe.select('lat', 'long')
data_rdd = data.rdd  # needs to be an RDD
data_rdd.cache()

# Build the model (cluster the data)
clusters = KMeans.train(data_rdd, 7, maxIterations=15, initializationMode="random")

But I am getting an error after a while:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5191.0 failed 4 times, most recent failure: Lost task 1.3 in stage 5191.0 (TID 260738, 10.19.211.69, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last)

I've tried to detach and re-attach the cluster. Same result. What am I doing wrong?

like image 486
user3245256 Avatar asked Dec 01 '17 02:12

user3245256


People also ask

What is clustering in Pyspark?

K-means is one of the most commonly used clustering algorithms for grouping data into a predefined number of clusters. The spark. mllib includes a parallelized variant of the k-means++ method called kmeans||. The KMeans function from pyspark.

What is elbow method in K-means?

The elbow method runs k-means clustering on the dataset for a range of values for k (say from 1-10) and then for each value of k computes an average score for all clusters. By default, the distortion score is computed, the sum of square distances from each point to its assigned center.

What is Silhouette score in clustering?

The silhouette value is a measure of how similar an object is to its own cluster (cohesion) compared to other clusters (separation). The silhouette ranges from −1 to +1, where a high value indicates that the object is well matched to its own cluster and poorly matched to neighboring clusters.

What is meant by hierarchical clustering?

Hierarchical clustering, also known as hierarchical cluster analysis, is an algorithm that groups similar objects into groups called clusters. The endpoint is a set of clusters, where each cluster is distinct from each other cluster, and the objects within each cluster are broadly similar to each other.


1 Answers

Since, based on another recent question of yours, I guess you are in your very first steps with Spark clustering (you are even importing sqrt & array, without ever using them, probably because it is like that in the docs example), let me offer advice in a more general level rather than in the specific question you are asking here (hopefully also saving you from subsequently opening 3-4 more questions, trying to get your cluster assignments back into your dataframe)...

Since

  1. you have your data already in a dataframe

  2. you want to attach the cluster membership back into your initial dataframe

you have no reason to revert to an RDD and use the (soon to be deprecated) MLlib package; you will do your job much more easily, elegantly, and efficiently using the (now recommended) ML package, which works directly with dataframes.

Step 0 - make some toy data resembling yours:

spark.version
# u'2.2.0'

df = spark.createDataFrame([[0, 33.3, -17.5],
                              [1, 40.4, -20.5],
                              [2, 28., -23.9],
                              [3, 29.5, -19.0],
                              [4, 32.8, -18.84]
                             ],
                              ["other","lat", "long"])

df.show()
# +-----+----+------+
# |other| lat|  long|
# +-----+----+------+
# |    0|33.3| -17.5|
# |    1|40.4| -20.5| 
# |    2|28.0| -23.9|
# |    3|29.5| -19.0|
# |    4|32.8|-18.84|
# +-----+----+------+

Step 1 - assemble your features

In contrast to most ML packages out there, Spark ML requires your input features to be gathered in a single column of your dataframe, usually named features; and it provides a specific method for doing this, VectorAssembler:

from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=["lat", "long"], outputCol="features")
new_df = vecAssembler.transform(df)
new_df.show()
# +-----+----+------+-------------+ 
# |other| lat|  long|     features|
# +-----+----+------+-------------+
# |    0|33.3| -17.5| [33.3,-17.5]|
# |    1|40.4| -20.5| [40.4,-20.5]|
# |    2|28.0| -23.9| [28.0,-23.9]| 
# |    3|29.5| -19.0| [29.5,-19.0]|
# |    4|32.8|-18.84|[32.8,-18.84]|
# +-----+----+------+-------------+ 

As perhaps already guessed, the argument inputCols serves to tell VectoeAssembler which particular columns in our dataframe are to be used as features.

Step 2 - fit your KMeans model

from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=2, seed=1)  # 2 clusters here
model = kmeans.fit(new_df.select('features'))

select('features') here serves to tell the algorithm which column of the dataframe to use for clustering - remember that, after Step 1 above, your original lat & long features are no more directly used.

Step 3 - transform your initial dataframe to include cluster assignments

transformed = model.transform(new_df)
transformed.show()    
# +-----+----+------+-------------+----------+ 
# |other| lat|  long|     features|prediction|
# +-----+----+------+-------------+----------+
# |    0|33.3| -17.5| [33.3,-17.5]|         0| 
# |    1|40.4| -20.5| [40.4,-20.5]|         1|
# |    2|28.0| -23.9| [28.0,-23.9]|         0|
# |    3|29.5| -19.0| [29.5,-19.0]|         0|
# |    4|32.8|-18.84|[32.8,-18.84]|         0|
# +-----+----+------+-------------+----------+

The last column of the transformed dataframe, prediction, shows the cluster assignment - in my toy case, I have ended up with 4 records in cluster #0 and 1 record in cluster #1.

You can further manipulate the transformed dataframe with select statements, or even drop the features column (which has now fulfilled its function and may be no longer necessary)...

Hopefully you are much closer now to what you actually wanted to achieve in the first place. For extracting cluster statistics etc., another recent answer of mine might be helpful...

like image 141
desertnaut Avatar answered Oct 01 '22 21:10

desertnaut