Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark LDA consumes too much memory

I'm trying to use spark mllib lda to summarize my document corpus.

My problem setting is as bellow.

  • about 100,000 documents
  • about 400,000 unique words
  • 100 cluster

I have 16 servers (each has 20 cores and 128GB memory).

When I execute LDA with OnlineLDAOptimizer, it gives out of memory error, suggesting me to increase spark.driver.maxResultSize like Total size of serialized results of 11 tasks (1302 MB) is bigger than spark.driver.maxResultSize

I increased spark.driver.maxResultSize to 120GB (and also spark.driver.memory to 120GB) and re-run LDA but no lack. It still says Total size of serialized results of 11 tasks (120.1 GB) is bigger than spark.driver.maxResultSize

I tried another dataset with about 100,000 unique words and it worked.

So, how can I estimate the memory usage when using Spark mllib LDA? I couldn't find any specification in the official documentation.

Note I used sparse vector for constructing docuemnt RDD[(Long, Vector)] passed to LDA.run() but don't know whether spark lda can handle sparse format correctly in internal.

(editted) I used Scala version of LDA. Not Python version.

This may be a related issue but no clear answer was given. Spark LDA woes - prediction and OOM questions

(edited)

This is a snippet of my code (gist). https://gist.github.com/lucidfrontier45/11420721c0078c5b7415

def startJob(args: RunArgs)(implicit sc: SparkContext): Unit = {
    val src = sc.textFile(args.fname, minPartitions = args.n_partitions).map(_.split("\t"))
        .flatMap {
            // input file's format is (user_id, product_name, count)
            case Array(u, p, r, t) => Some((u.toInt, p.toInt, r.toDouble))
            case _ => None
        }.persist()

    // Map to convert user_id or product_name into unique sequencential id
    val userid_map = src.map(_._1).distinct().zipWithIndex().collect().toMap
    val productid_map = src.map(_._2).distinct().zipWithIndex().collect().toMap
    val inverse_userid_map = userid_map.map(_.swap)

    // broadcat to speedup RDD map operation
    val b_userid_map = sc.broadcast(userid_map)
    val b_productid_map = sc.broadcast(productid_map)
    val b_inverse_userid_map = sc.broadcast(inverse_userid_map)

    // run map
    val transformed_src = src.map { case (u, p, r) =>
        (b_userid_map.value(u), b_productid_map.value(p).toInt, r)
    }

    println("unique items = %d".format(b_productid_map.value.size))

    // prepare for LDA input RDD[(LONG, Vector)]
    val documents = transformed_src.map { case (u, p, r) => (u, (p, r)) }
        .groupByKey()
        .map { t => (t._1, Vectors.sparse(b_productid_map.value.size, t._2.toSeq)) }.persist()

    documents.count()
    src.unpersist()

    // run Online Variational LDA
    val ldamodel = new LDA()
        .setK(args.k)
        .setMaxIterations(args.n_iter)
        .setOptimizer("online")
        .run(documents)
        .asInstanceOf[LocalLDAModel]


    val result = ldamodel.topicDistributions(documents)
        .map { case (i, v) =>
            val u = b_inverse_userid_map.value(i)
            "%d,%s".format(u, v.toArray.mkString(","))
        }
    result.saveAsTextFile(args.out)
}

Actually, I use LDA for dimensional reduction of transaction data. My data is in the format of (u, p, r) where u is user id, p is product name, r is the number user u interacted with p. user corresponds to document and product to word in this case. Since user id and product name are arbitrary string, I converted them to unique sequential integers before submitting to LDA.

Thank you.

like image 915
Du Shiqiao Avatar asked Mar 14 '16 03:03

Du Shiqiao


1 Answers

There are three common causes for this problem, which may work independently or in tandem.

  1. The job returns a lot of data to the driver using something like collect. Alas, some of the SparkML code does this. If you cannot blame (2) or (3) below for the problem, it's likely the outcome of how your data interacts with the OnlineLDAOptimizer implementation.

  2. The job involves a large number of tasks, each of which returns results to the driver as part of Spark's job management (as opposed to with something like collect). Check the number of tasks in the SparkUI. Also see Exceeding `spark.driver.maxResultSize` without bringing any data to the driver Are org.apache.spark.scheduler.TaskSetManager#canFetchMoreResults or org.apache.spark.scheduler.TaskResultGetter#enqueueSuccessfulTask on the stack trace?

  3. An estimation error: Spark significantly over-estimates the size of data that's about to be returned to the driver and throws this error in order to prevent a cluster's driver from OOM. See What is spark.driver.maxResultSize? One way to test for this is to set spark.driver.maxResultSize to 0 (no limit) and see what happens.

Hope this helps!

like image 123
Sim Avatar answered Oct 30 '22 01:10

Sim