Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark MLLib Kmeans from dataframe, and back again

I aim to apply a kmeans clustering algorithm to a very large data set using Spark (1.3.1) MLLib. I have called the data from an HDFS using a hiveContext from Spark, and would eventually like to put it back there that way - in this format

    |I.D     |cluster |
    ===================
    |546     |2       |
    |6534    |4       |
    |236     |5       |
    |875     |2       |

I have ran the following code, where "data" is a dataframe of doubles, and an ID for the first column.

    val parsedData = data.rdd.map(s => Vectors.dense(s.getDouble(1),s.getDouble(2))).cache()
    val clusters = KMeans.train(parsedData, 3, 20)

This runs successfully, I'm stuck now mapping the clusters back to their respective IDs, in a dataframe as described above. I can convert it to a datframe with:

    sc.makeRDD(clusters.predict(parsedData).toArray()).toDF()

But that's as far as I've got. This post is on the right track, and this post I think is asking a similar question to mine.

I suspect the labeledPoint library is needed. Any comments,answers would be appreciated, cheers.

Edit: Just found this in the Spark userlist, looks promising

like image 406
Michael Plazzer Avatar asked Jul 16 '15 06:07

Michael Plazzer


2 Answers

I understand that you want to get DataFrame at the end. I see two possible solutions. I'd say that choosing between them is matter of taste.

Create column from RDD

It's very easy to obtain pairs of ids and clusters in form of RDD:

val idPointRDD = data.rdd.map(s => (s.getInt(0), Vectors.dense(s.getDouble(1),s.getDouble(2)))).cache()
val clusters = KMeans.train(idPointRDD.map(_._2), 3, 20)
val clustersRDD = clusters.predict(idPointRDD.map(_._2))
val idClusterRDD = idPointRDD.map(_._1).zip(clustersRDD)

Then you create DataFrame from that

val idCluster = idClusterRDD.toDF("id", "cluster")

It works because map doesn't change order of the data in RDD, which is why you can just zip ids with results of prediction.

Use UDF (User Defined Function)

Second method involves using clusters.predict method as UDF:

val bcClusters = sc.broadcast(clusters)
def predict(x: Double, y: Double): Int = {
    bcClusters.value.predict(Vectors.dense(x, y))
}
sqlContext.udf.register("predict", predict _)

Now we can use it to add predictions to data:

val idCluster = data.selectExpr("id", "predict(x, y) as cluster")

Keep in mind that Spark API doesn't allow UDF deregistration. This means that closure data will be kept in the memory.

Wrong / unoptimal solutions

  • Using clusters.predict without broadcasting

It won't work in the distributed setup. Edit: actually it will work, I was confused by implementation of predict for RDD, which uses broadcast.

  • sc.makeRDD(clusters.predict(parsedData).toArray()).toDF()

toArray collects all data in the driver. This means that in distributed mode you will be copying cluster ids into one node.

like image 177
krcz Avatar answered Nov 09 '22 14:11

krcz


I'm doing something similar using pySpark. I'm guessing you could directly translate this to Scala as there is nothing python specific. myPointsWithID is my RDD with an ID for each point and the point represented as an array of values.

# Get an RDD of only the vectors representing the points to be clustered
points = myPointsWithID.map(lambda (id, point): point)
clusters = KMeans.train(points, 
                        100, 
                        maxIterations=100, 
                        runs=50,
                        initializationMode='random')

# For each point in the original RDD, replace the point with the
# ID of the cluster the point belongs to. 
clustersBC = sc.broadcast(clusters)
pointClusters = myPointsWithID.map(lambda (id, point): (id, clustersBC.value.predict(point)))
like image 2
soorajmr Avatar answered Nov 09 '22 13:11

soorajmr