I have an RDD[String]
, wordRDD
. I also have a function that creates an RDD[String] from a string/word. I would like to create a new RDD for each string in wordRDD
. Here are my attempts:
1) Failed because Spark does not support nested RDDs:
var newRDD = wordRDD.map( word => {
// execute myFunction()
(new MyClass(word)).myFunction()
})
2) Failed (possibly due to scope issue?):
var newRDD = sc.parallelize(new Array[String](0))
val wordArray = wordRDD.collect
for (w <- wordArray){
newRDD = sc.union(newRDD,(new MyClass(w)).myFunction())
}
My ideal result would look like:
// input RDD (wordRDD)
wordRDD: org.apache.spark.rdd.RDD[String] = ('apple','banana','orange'...)
// myFunction behavior
new MyClass('apple').myFunction(): RDD[String] = ('pple','aple'...'appl')
// after executing myFunction() on each word in wordRDD:
newRDD: RDD[String] = ('pple','aple',...,'anana','bnana','baana',...)
I found a relevant question here: Spark when union a lot of RDD throws stack overflow error, but it didn't address my issue.
There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.
cogroup() can be used for much more than just implementing joins. We can also use it to implement intersect by key. Additionally, cogroup() can work on three or more RDDs at once.
2.3. The RDDs store data in memory for fast access to data during computation and provide fault tolerance [110]. An RDD is an immutable distributed collection of key–value pairs of data, stored across nodes in the cluster. The RDD can be operated in parallel.
Distributed: Data present in an RDD resides on multiple nodes. It is distributed across different nodes of a cluster. Lazy evaluation: Data does not get loaded in an RDD even if you define it. Transformations are actually computed when you call action, such as count or collect, or save the output to a file system.
Use flatMap
to get RDD[String]
as you desire.
var allWords = wordRDD.flatMap { word =>
(new MyClass(word)).myFunction().collect()
}
You cannot create a RDD
from within another RDD
.
However, it is possible to rewrite your function myFunction: String => RDD[String]
, which generates all words from the input where one letter is removed, into another function modifiedFunction: String => Seq[String]
such that it can be used from within an RDD. That way, it will also be executed in parallel on your cluster. Having the modifiedFunction
you can obtain the final RDD
with all words by simply calling wordRDD.flatMap(modifiedFunction)
.
The crucial point is to use flatMap
(to map
and flatten
the transformations):
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Test").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val input = sc.parallelize(Seq("apple", "ananas", "banana"))
// RDD("pple", "aple", ..., "nanas", ..., "anana", "bnana", ...)
val result = input.flatMap(modifiedFunction)
}
def modifiedFunction(word: String): Seq[String] = {
word.indices map {
index => word.substring(0, index) + word.substring(index+1)
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With