Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Speed up collaborative filtering for large dataset in Spark MLLib

I'm using MLlib's matrix factorization to recommend items to users. I have about a big implicit interaction matrix of M=20 million users and N=50k items. After training the model I want to get a short list(e.g. 200) of recommendations for each user. I tried recommendProductsForUsers in MatrixFactorizationModel but it's very very slow (ran 9 hours but still far from finish. I'm testing with 50 executors, each with 8g memory). This might be expected since recommendProductsForUsers need to calculate all M*N user-item interactions and get top for each user.

I'll try use more executors but from what I saw from the application detail on Spark UI, I doubt that it can finish in hours or a day even I have 1000 executors (after 9hours it's still in the flatmap here https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala#L279-L289, 10000 total tasks and only ~200 finished) Are there any other things that I can tune to speed up the recommendation process beside increasing # of executors?

Here is sample code:

val data = input.map(r => Rating(r.getString(0).toInt, r.getString(1).toInt, r.getLong(2))).cache
val rank = 20
val alpha = 40
val maxIter = 10
val lambda = 0.05
val checkpointIterval = 5
val als = new ALS()
    .setImplicitPrefs(true)
    .setCheckpointInterval(checkpointIterval)
    .setRank(rank)
    .setAlpha(alpha)
    .setIterations(maxIter)
    .setLambda(lambda)
val model = als.run(ratings)
val recommendations = model.recommendProductsForUsers(200)
recommendations.saveAsTextFile(outdir)
like image 772
Rainfield Avatar asked Aug 23 '16 15:08

Rainfield


1 Answers

@Jack Lei: Did you find the answer to this? I myself tried few things but only helped a little.

For eg: I tried

javaSparkContext.setCheckpointDir("checkpoint/");

This helps becuase it avoid repeated computation in between.

Also tried adding more memory per Executor and overhead spark memory

--conf spark.driver.maxResultSize=5g --conf spark.yarn.executor.memoryOverhead=4000
like image 153
Shadow Avatar answered Sep 22 '22 06:09

Shadow