I'm trying to use spark mllib lda to summarize my document corpus.
My problem setting is as bellow.
I have 16 servers (each has 20 cores and 128GB memory).
When I execute LDA with OnlineLDAOptimizer
, it gives out of memory error, suggesting me to increase spark.driver.maxResultSize
like
Total size of serialized results of 11 tasks (1302 MB) is bigger than spark.driver.maxResultSize
I increased spark.driver.maxResultSize
to 120GB (and also spark.driver.memory
to 120GB) and re-run LDA but no lack.
It still says Total size of serialized results of 11 tasks (120.1 GB) is bigger than spark.driver.maxResultSize
I tried another dataset with about 100,000 unique words and it worked.
So, how can I estimate the memory usage when using Spark mllib LDA? I couldn't find any specification in the official documentation.
Note I used sparse vector for constructing docuemnt RDD[(Long, Vector)]
passed to LDA.run()
but don't know whether spark lda can handle sparse format correctly in internal.
(editted) I used Scala version of LDA. Not Python version.
This may be a related issue but no clear answer was given. Spark LDA woes - prediction and OOM questions
(edited)
This is a snippet of my code (gist). https://gist.github.com/lucidfrontier45/11420721c0078c5b7415
def startJob(args: RunArgs)(implicit sc: SparkContext): Unit = {
val src = sc.textFile(args.fname, minPartitions = args.n_partitions).map(_.split("\t"))
.flatMap {
// input file's format is (user_id, product_name, count)
case Array(u, p, r, t) => Some((u.toInt, p.toInt, r.toDouble))
case _ => None
}.persist()
// Map to convert user_id or product_name into unique sequencential id
val userid_map = src.map(_._1).distinct().zipWithIndex().collect().toMap
val productid_map = src.map(_._2).distinct().zipWithIndex().collect().toMap
val inverse_userid_map = userid_map.map(_.swap)
// broadcat to speedup RDD map operation
val b_userid_map = sc.broadcast(userid_map)
val b_productid_map = sc.broadcast(productid_map)
val b_inverse_userid_map = sc.broadcast(inverse_userid_map)
// run map
val transformed_src = src.map { case (u, p, r) =>
(b_userid_map.value(u), b_productid_map.value(p).toInt, r)
}
println("unique items = %d".format(b_productid_map.value.size))
// prepare for LDA input RDD[(LONG, Vector)]
val documents = transformed_src.map { case (u, p, r) => (u, (p, r)) }
.groupByKey()
.map { t => (t._1, Vectors.sparse(b_productid_map.value.size, t._2.toSeq)) }.persist()
documents.count()
src.unpersist()
// run Online Variational LDA
val ldamodel = new LDA()
.setK(args.k)
.setMaxIterations(args.n_iter)
.setOptimizer("online")
.run(documents)
.asInstanceOf[LocalLDAModel]
val result = ldamodel.topicDistributions(documents)
.map { case (i, v) =>
val u = b_inverse_userid_map.value(i)
"%d,%s".format(u, v.toArray.mkString(","))
}
result.saveAsTextFile(args.out)
}
Actually, I use LDA for dimensional reduction of transaction data. My data is in the format of (u, p, r)
where u
is user id, p
is product name, r
is the number user u
interacted with p
. user corresponds to document and product to word in this case. Since user id and product name are arbitrary string, I converted them to unique sequential integers before submitting to LDA.
Thank you.
There are three common causes for this problem, which may work independently or in tandem.
The job returns a lot of data to the driver using something like collect
. Alas, some of the SparkML code does this. If you cannot blame (2) or (3) below for the problem, it's likely the outcome of how your data interacts with the OnlineLDAOptimizer
implementation.
The job involves a large number of tasks, each of which returns results to the driver as part of Spark's job management (as opposed to with something like collect
). Check the number of tasks in the SparkUI. Also see Exceeding `spark.driver.maxResultSize` without bringing any data to the driver Are org.apache.spark.scheduler.TaskSetManager#canFetchMoreResults
or org.apache.spark.scheduler.TaskResultGetter#enqueueSuccessfulTask
on the stack trace?
An estimation error: Spark significantly over-estimates the size of data that's about to be returned to the driver and throws this error in order to prevent a cluster's driver from OOM. See What is spark.driver.maxResultSize? One way to test for this is to set spark.driver.maxResultSize
to 0 (no limit) and see what happens.
Hope this helps!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With