Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create collection of RDDs out of RDD?

I have an RDD[String], wordRDD. I also have a function that creates an RDD[String] from a string/word. I would like to create a new RDD for each string in wordRDD. Here are my attempts:

1) Failed because Spark does not support nested RDDs:

var newRDD = wordRDD.map( word => {
  // execute myFunction()
  (new MyClass(word)).myFunction()
})

2) Failed (possibly due to scope issue?):

var newRDD = sc.parallelize(new Array[String](0))
val wordArray = wordRDD.collect
for (w <- wordArray){
  newRDD = sc.union(newRDD,(new MyClass(w)).myFunction())
}

My ideal result would look like:

// input RDD (wordRDD)
wordRDD: org.apache.spark.rdd.RDD[String] = ('apple','banana','orange'...)

// myFunction behavior
new MyClass('apple').myFunction(): RDD[String] = ('pple','aple'...'appl')

// after executing myFunction() on each word in wordRDD:
newRDD: RDD[String] = ('pple','aple',...,'anana','bnana','baana',...)

I found a relevant question here: Spark when union a lot of RDD throws stack overflow error, but it didn't address my issue.

like image 524
matsuninja Avatar asked Sep 10 '15 21:09

matsuninja


People also ask

How many ways are there to create RDDs?

There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

How many RDDs can Cogroup () can work at once?

cogroup() can be used for much more than just implementing joins. We can also use it to implement intersect by key. Additionally, cogroup() can work on three or more RDDs at once.

How are RDDs stored?

2.3. The RDDs store data in memory for fast access to data during computation and provide fault tolerance [110]. An RDD is an immutable distributed collection of key–value pairs of data, stored across nodes in the cluster. The RDD can be operated in parallel.

How is RDD data distributed?

Distributed: Data present in an RDD resides on multiple nodes. It is distributed across different nodes of a cluster. Lazy evaluation: Data does not get loaded in an RDD even if you define it. Transformations are actually computed when you call action, such as count or collect, or save the output to a file system.


2 Answers

Use flatMap to get RDD[String] as you desire.

var allWords = wordRDD.flatMap { word => 
  (new MyClass(word)).myFunction().collect()
}
like image 112
Climbs_lika_Spyder Avatar answered Sep 30 '22 15:09

Climbs_lika_Spyder


You cannot create a RDD from within another RDD.

However, it is possible to rewrite your function myFunction: String => RDD[String], which generates all words from the input where one letter is removed, into another function modifiedFunction: String => Seq[String] such that it can be used from within an RDD. That way, it will also be executed in parallel on your cluster. Having the modifiedFunction you can obtain the final RDD with all words by simply calling wordRDD.flatMap(modifiedFunction).

The crucial point is to use flatMap (to map and flatten the transformations):

def main(args: Array[String]) {
  val sparkConf = new SparkConf().setAppName("Test").setMaster("local[*]")
  val sc = new SparkContext(sparkConf)

  val input = sc.parallelize(Seq("apple", "ananas", "banana"))

  // RDD("pple", "aple", ..., "nanas", ..., "anana", "bnana", ...)
  val result = input.flatMap(modifiedFunction) 
}

def modifiedFunction(word: String): Seq[String] = {
  word.indices map {
    index => word.substring(0, index) + word.substring(index+1)
  }
}
like image 37
Till Rohrmann Avatar answered Sep 30 '22 16:09

Till Rohrmann