Inside the given directory I have many different folders and inside each folder I have Hadoop files (part_001
, etc.).
directory
-> folder1
-> part_001...
-> part_002...
-> folder2
-> part_001...
...
Given the directory, how can I recursively read the content of all folders inside this directory and load this content into a single RDD in Spark using Scala?
I found this, but it does not recursively enters into sub-folders (I am using import org.apache.hadoop.mapreduce.lib.input
):
var job: Job = null
try {
job = Job.getInstance()
FileInputFormat.setInputPaths(job, new Path("s3n://" + bucketNameData + "/" + directoryS3))
FileInputFormat.setInputDirRecursive(job, true)
} catch {
case ioe: IOException => ioe.printStackTrace(); System.exit(1);
}
val sourceData = sc.newAPIHadoopRDD(job.getConfiguration(), classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).values
I also found this web-page that uses SequenceFile
, but again I don't understand how to apply it to my case?
If you are using Spark, you can do this using wilcards as follow:
scala>sc.textFile("path/*/*")
sc is the SparkContext which if you are using spark-shell is initialized by default or if you are creating your own program should will have to instance a SparkContext by yourself.
Be careful with the following flag:
scala> sc.hadoopConfiguration.get("mapreduce.input.fileinputformat.input.dir.recursive")
> res6: String = null
Yo should set this flag to true:
sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true")
I have found that the parameters must be set in this way:
.set("spark.hive.mapred.supports.subdirectories","true")
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With