Given a large text file I want to extract the character n-grams using Apache Spark (do the task in parallel).
Example input (2 line text): line 1: (Hello World, it) line 2: (is a nice day)
Output n-grams: Hel - ell -llo -lo_ - o_W - _Wo - Wor - orl - rld - ld, - d,_ - ,_i - _it - it_ - t_i - _is - ... and so on. So I want the return value to be a RDD[String], each string containing the n-gram.
Notice that the new line is considered a white space in the output n-grams. I put each line in parenthesis to be clear. Also, just to be clear the string or text is not a single entry in a RDD. I read the file using sc.textFile() method.
The main idea is to take all the lines within each partition and combine them into a long String. Next, we replace " " with "_" and call sliding on this string to create the trigrams for each partition in parallel.
Note: The resulting trigrams might not be 100% accurate since we will miss few trigrams from the beginning and the end of a each partition. Given that each partition can be several million characters long, the loss in assurance should be negligible. The main benefit here is that each partition can be executed in parallel.
Here are some toy data. Everything bellow can be executed on any Spark REPL:
scala> val data = sc.parallelize(Seq("Hello World, it","is a nice day"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[12]
val trigrams = data.mapPartitions(_.toList.mkString(" ").replace(" ","_").sliding(3))
trigrams: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[14]
Here I will collect the trigrams to show how they look like (you might not want to do this if your dataset is massive)
scala> val asCollected = trigrams.collect
asCollected: Array[String] = Array(Hel, ell, llo, lo_, o_W, _Wo, Wor, orl, rld, ld,, d,_, ,_i, _it, is_, s_a, _a_, a_n, _ni, nic, ice, ce_, e_d, _da, day)
You could use a function like the following:
def n_gram(str:String, n:Int) = (str + " ").sliding(n)
I am assuming the newline has been stripped off when reading the line, so I've a added a space to compensate for that. If, on the other hand, the newline is preserved, you could define it as:
def n_gram(str:String, n:Int) = str.replace('\n', ' ').sliding(n)
Using your example:
println(n_gram("Hello World, it", 3).map(_.replace(' ', '_')).mkString(" - "))
would return:
Hel - ell - llo - lo_ - o_W - _Wo - Wor - orl - rld - ld, - d,_ - ,_i - _it - it_
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With