I'm currently trying to train a set of Word2Vec Vectors on the UMBC Webbase Corpus (around 30GB of text in 400 files).
I often run into out of memory situations even on 100 GB plus Machines. I run Spark in the application itself. I tried to tweak a little bit, but I am not able to perform this operation on more than 10 GB of textual data. The clear bottleneck of my implementation is the union of the previously computed RDDs, that where the out of memory exception comes from.
Maybe one you have the experience to come up with a more memory efficient implementation than this:
object SparkJobs {
val conf = new SparkConf()
.setAppName("TestApp")
.setMaster("local[*]")
.set("spark.executor.memory", "100g")
.set("spark.rdd.compress", "true")
val sc = new SparkContext(conf)
def trainBasedOnWebBaseFiles(path: String): Unit = {
val folder: File = new File(path)
val files: ParSeq[File] = folder.listFiles(new TxtFileFilter).toIndexedSeq.par
var i = 0;
val props = new Properties();
props.setProperty("annotators", "tokenize, ssplit");
props.setProperty("nthreads","2")
val pipeline = new StanfordCoreNLP(props);
//preprocess files parallel
val training_data_raw: ParSeq[RDD[Seq[String]]] = files.map(file => {
//preprocess line of file
println(file.getName() +"-" + file.getTotalSpace())
val rdd_lines: Iterator[Option[Seq[String]]] = for (line <- Source.fromFile(file,"utf-8").getLines) yield {
//performs some preprocessing like tokenization, stop word filtering etc.
processWebBaseLine(pipeline, line)
}
val filtered_rdd_lines = rdd_lines.filter(line => line.isDefined).map(line => line.get).toList
println(s"File $i done")
i = i + 1
sc.parallelize(filtered_rdd_lines).persist(StorageLevel.MEMORY_ONLY_SER)
})
val rdd_file = sc.union(training_data_raw.seq)
val starttime = System.currentTimeMillis()
println("Start Training")
val word2vec = new Word2Vec()
word2vec.setVectorSize(100)
val model: Word2VecModel = word2vec.fit(rdd_file)
println("Training time: " + (System.currentTimeMillis() - starttime))
ModelUtil.storeWord2VecModel(model, Config.WORD2VEC_MODEL_PATH)
}}
}
There are three ways to create an RDD in Spark. Parallelizing already existing collection in driver program. Referencing a dataset in an external storage system (e.g. HDFS, Hbase, shared file system). Creating RDD from already existing RDDs.
As RDDs are the main abstraction in Spark, RDDs are cached using persist() or the cache() method. All the RDD is stored in-memory, while we use cache() method. As RDD stores the value in memory, the data which does not fit in memory is either recalculated or the excess data is sent to disk.
You can save the RDD using saveAsObjectFile and saveAsTextFile method. Whereas you can read the RDD using textFile and sequenceFile function from SparkContext.
There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.
Like Sarvesh points out in the comments, it is probably too much data for a single machine. Use more machines. We typically see the need for 20–30 GB of memory to work with a file of 1 GB. By this (extremely rough) estimate you'd need 600–800 GB of memory for the 30 GB input. (You can get a more accurate estimate by loading a part of the data.)
As a more general comment, I'd suggest you avoid using rdd.union
and sc.parallelize
. Use instead sc.textFile
with a wildcard to load all files into a single RDD.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With