Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: How to transform a Seq of RDD into a RDD

I'm just starting in Spark & Scala

I have a directory with multiple files in it I successfully load them using

sc.wholeTextFiles(directory)

Now I want to go one level up. I actually have a directory that contains sub directories that contain files. My goal is to get an RDD[(String,String)] so I can move forward, where the RDD represents name and content of the file.

I tried the following:

val listOfFolders = getListOfSubDirectories(rootFolder)
val input = listOfFolders.map(directory => sc.wholeTextFiles(directory))

but I got a Seq[RDD[(String,String)]] How do I transform this Seq into an RDD[(String,String)] ?

Or maybe I'm not doing things right and I should try a different approach?

Edit: added code

// HADOOP VERSION
val rootFolderHDFS = "hdfs://****/"
val hdfsURI = "hdfs://****/**/"

// returns a list of folders (currently about 800)
val listOfFoldersHDFS = ListDirectoryContents.list(hdfsURI,rootFolderHDFS)
val inputHDFS = listOfFoldersHDFS.map(directory => sc.wholeTextFiles(directory))
// RDD[(String,String)]
//    val inputHDFS2 = inputHDFS.reduceRight((rdd1,rdd2) => rdd2 ++ rdd1)
val init = sc.parallelize(Array[(String, String)]())
val inputHDFS2 = inputHDFS.foldRight(init)((rdd1,rdd2) => rdd2 ++ rdd1)

// returns org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError
println(inputHDFS2.count)
like image 856
Stephane Maarek Avatar asked Jan 09 '23 04:01

Stephane Maarek


2 Answers

You can reduce on the Seq like this (concatenating the RDDs with ++):

val reduced: RDD[(String, String)] = input.reduce((left, right) => left ++ right)

A few more details why can we apply reduce here:

  • ++ is associative - it does not matter you rdda ++ (rddb ++ rddc) or (rdda ++ rddb) ++ rddc
  • assumed the Seq is nonempty (otherwise fold would be a better choice, it would require an empty RDD[(String, String)] as the initial accumulator).

Depending on the exact type of Seq, you might get a stackoverflow, so be careful and test with a larger collection, though for the standard library I think it is safe.

like image 122
Gábor Bakos Avatar answered Jan 29 '23 20:01

Gábor Bakos


You should use union provided by spark context

val rdds: Seq[RDD[Int]] = (1 to 100).map(i => sc.parallelize(Seq(i)))
val rdd_union: RDD[Int] = sc.union(rdds) 
like image 33
raam86 Avatar answered Jan 29 '23 19:01

raam86