after loading a saved MatrixFactorizationModel I get the warnings: MatrixFactorizationModelWrapper: Product factor does not have a partitioner. Prediction on individual records could be slow. MatrixFactorizationModelWrapper: Product factor is not cached. Prediction could be slow.
and indeed the computation is slow and will not scale well
how do I set a partitioner and cache the Product factor?
adding code that demonstrates the problem:
from pyspark import SparkContext
import sys
sc = SparkContext("spark://hadoop-m:7077", "recommend")
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
model = MatrixFactorizationModel.load(sc, "model")
model.productFeatures.cache()
i get:
Traceback (most recent call last): File "/home/me/recommend.py", line 7, in model.productFeatures.cache() AttributeError: 'function' object has no attribute 'cache'
It is 0.6 x (JVM heap space - 300MB) by default.
In Spark SQL caching is a common technique for reusing some computation. It has the potential to speedup other queries that are using the same data, but there are some caveats that are good to keep in mind if we want to achieve good performance.
You can call getStorageLevel. useMemory on the Dataframe and the RDD to find out if the dataset is in memory.
Concerning the caching, like I wrote in the comment box, you can cache your rdd doing the following :
rdd.cache() # for Scala, Java and Python
EDIT: The userFeatures and the productFeatures are both of type RDD[(Int, Array[Double]). (Ref. Official Documentation)
To cache the productFeature, you can do the following
model.productFeatures().cache()
Of course I consider that loaded model is called model.
Example :
r1 = (1, 1, 1.0)
r2 = (1, 2, 2.0)
r3 = (2, 1, 2.0)
ratings = sc.parallelize([r1, r2, r3])
from pyspark.mllib.recommendation import ALS
model = ALS.trainImplicit(ratings, 1, seed=10)
model.predict(2, 2)
feats = model.productFeatures()
type(feats)
>> MapPartitionsRDD[137] at mapPartitions at PythonMLLibAPI.scala:1074
feats.cache()
As for the warning concerning the partitioner, even if you partition your model, let's say by feature with .partitionBy() to balance it it would still be too expensive performance.
There is a JIRA ticket (SPARK-8708) concerning this issue that should be resolved in the next release of Spark (1.5).
Nevertheless, if you want to learning more about partitioning algorithms, I invite you to read the the discussion in this ticket SPARK-3717 that argues about partitioning by features within the DecisionTree and RandomForest algorithms.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With