I have a Spark job that needs to compute movie content-based similarities. There are 46k movies. Each movie is represented by a set of SparseVectors (each vector is a feature vector for one of the movie's fields such as Title, Plot, Genres, Actors, etc.). For Actors and Genres, for example, the vector shows whether a given actor is present (1) or absent (0) in the movie.
The task is to find top 10 similar movies for each movie. I managed to write a script in Scala that performs all those computations and does the job. It works for smaller sets of movies such as 1000 movies but not for the whole dataset (out of memory, etc.).
The way I do this computation is by using a cross join on the movies dataset. Then reduce the problem by only taking rows where movie1_id < movie2_id. Still the dataset at this point will contain 46000^2/2 rows which is 1058000000. And each row has significant amount of data.
Then I calculate similarity score for each row. After similarity is calculated I group the results where movie1_id is same and sort them in descending order by similarity score using a Window function taking top N items (similar to how it's described here: Spark get top N highest score results for each (item1, item2, score)).
The question is - can it be done more efficiently in Spark? E.g. without having to perform a crossJoin?
And another question - how does Spark deal with such huge Dataframes (1058000000 rows consisting of multiple SparseVectors)? Does it have to keep all this in memory at a time? Or does it process such dataframes piece by piece somehow?
I'm using the following function to calculate similarity between movie vectors:
def intersectionCosine(movie1Vec: SparseVector, movie2Vec: SparseVector): Double = {
val a: BSV[Double] = toBreeze(movie1Vec)
val b: BSV[Double] = toBreeze(movie2Vec)
var dot: Double = 0
var offset: Int = 0
while( offset < a.activeSize) {
val index: Int = a.indexAt(offset)
val value: Double = a.valueAt(offset)
dot += value * b(index)
offset += 1
}
val bReduced: BSV[Double] = new BSV(a.index, a.index.map(i => b(i)), a.index.length)
val maga: Double = magnitude(a)
val magb: Double = magnitude(bReduced)
if (maga == 0 || magb == 0)
return 0
else
return dot / (maga * magb)
}
Each row in the Dataframe consists of two joined classes:
final case class MovieVecData(imdbID: Int,
Title: SparseVector,
Decade: SparseVector,
Plot: SparseVector,
Genres: SparseVector,
Actors: SparseVector,
Countries: SparseVector,
Writers: SparseVector,
Directors: SparseVector,
Productions: SparseVector,
Rating: Double
)
It can be done more efficiently, as long as you are fine with approximations, and don't require exact results (or exact number or results).
Similarly to my answer to Efficient string matching in Apache Spark you can use LSH, with:
BucketedRandomProjectionLSH
to approximate Euclidean distance.MinHashLSH
to approximate Jaccard Distance.If feature space is small (or can be reasonably reduced) and each category is relatively small you can also optimize your code by hand:
explode
feature array to generate #features records from a single record.A minimal example would be (consider it to be a pseudocode):
import org.apache.spark.ml.linalg._
// This is oversimplified. In practice don't assume only sparse scenario
val indices = udf((v: SparseVector) => v.indices)
val df = Seq(
(1L, Vectors.sparse(1024, Array(1, 3, 5), Array(1.0, 1.0, 1.0))),
(2L, Vectors.sparse(1024, Array(3, 8, 12), Array(1.0, 1.0, 1.0))),
(3L, Vectors.sparse(1024, Array(3, 5), Array(1.0, 1.0))),
(4L, Vectors.sparse(1024, Array(11, 21), Array(1.0, 1.0))),
(5L, Vectors.sparse(1024, Array(21, 32), Array(1.0, 1.0)))
).toDF("id", "features")
val possibleMatches = df
.withColumn("key", explode(indices($"features")))
.transform(df => df.alias("left").join(df.alias("right"), Seq("key")))
val closeEnough(threshold: Double) = udf((v1: SparseVector, v2: SparseVector) => intersectionCosine(v1, v2) > threshold)
possilbeMatches.filter(closeEnough($"left.features", $"right.features")).select($"left.id", $"right.id").distinct
Note that both solutions are worth the overhead only if hashing / features are selective enough (and optimally sparse). In the example shown above you'd compare only rows inside set {1, 2, 3} and {4, 5}, never between sets.
However in the worst case scenario (M records, N features) we can make N M2 comparisons, instead of M2
Another thought.. Given that your matrix is relatively small and sparse, it can fit in memory using breeze CSCMatrix[Int].
Then, you can compute co-occurrences using A'B (A.transposed * B) followed by a TopN selection of the LLR (logLikelyhood ratio) of each pairs. Here, since you keep only 10 top items per row, the output matrix will be very sparse as well.
You can lookup the details here:
https://github.com/actionml/universal-recommender
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With