Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Caching factor of MatrixFactorizationModel in PySpark

after loading a saved MatrixFactorizationModel I get the warnings: MatrixFactorizationModelWrapper: Product factor does not have a partitioner. Prediction on individual records could be slow. MatrixFactorizationModelWrapper: Product factor is not cached. Prediction could be slow.

and indeed the computation is slow and will not scale well

how do I set a partitioner and cache the Product factor?

adding code that demonstrates the problem:

from pyspark import SparkContext
import sys

sc = SparkContext("spark://hadoop-m:7077", "recommend")    
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
model = MatrixFactorizationModel.load(sc, "model")
model.productFeatures.cache()

i get:

Traceback (most recent call last): File "/home/me/recommend.py", line 7, in model.productFeatures.cache() AttributeError: 'function' object has no attribute 'cache'

like image 854
mich Avatar asked Sep 27 '22 22:09

mich


People also ask

How much data we can cache in Spark?

It is 0.6 x (JVM heap space - 300MB) by default.

What is caching in Pyspark?

In Spark SQL caching is a common technique for reusing some computation. It has the potential to speedup other queries that are using the same data, but there are some caveats that are good to keep in mind if we want to achieve good performance.

How do I know if a data frame is cached?

You can call getStorageLevel. useMemory on the Dataframe and the RDD to find out if the dataset is in memory.


1 Answers

Concerning the caching, like I wrote in the comment box, you can cache your rdd doing the following :

rdd.cache() # for Scala, Java and Python

EDIT: The userFeatures and the productFeatures are both of type RDD[(Int, Array[Double]). (Ref. Official Documentation)

To cache the productFeature, you can do the following

model.productFeatures().cache() 

Of course I consider that loaded model is called model.

Example :

r1 = (1, 1, 1.0)
r2 = (1, 2, 2.0)
r3 = (2, 1, 2.0)

ratings = sc.parallelize([r1, r2, r3])

from pyspark.mllib.recommendation import ALS

model = ALS.trainImplicit(ratings, 1, seed=10)
model.predict(2, 2)

feats = model.productFeatures()

type(feats)

>> MapPartitionsRDD[137] at mapPartitions at PythonMLLibAPI.scala:1074
feats.cache()

As for the warning concerning the partitioner, even if you partition your model, let's say by feature with .partitionBy() to balance it it would still be too expensive performance.

There is a JIRA ticket (SPARK-8708) concerning this issue that should be resolved in the next release of Spark (1.5).

Nevertheless, if you want to learning more about partitioning algorithms, I invite you to read the the discussion in this ticket SPARK-3717 that argues about partitioning by features within the DecisionTree and RandomForest algorithms.

like image 173
eliasah Avatar answered Oct 21 '22 22:10

eliasah