Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Recursively fetch file contents from subdirectories using sc.textFile

It seems that SparkContext textFile expects only files to be present in the given directory location - it does not either

  • (a) recurse or
  • (b) even support directories (tries to read directories as files)

Any suggestion how to structure a recursion - potentially simpler than creating the recursive file list / descent logic manually?

Here is the use case: files under

/data/tables/my_table

I want to be able to read via an hdfs call all the files at all directory levels under that parent directory.

UPDATE

The sc.textFile() invokes the Hadoop FileInputFormat via the (subclass) TextInputFormat. Inside the logic does exist to do the recursive directory reading - i.e. first detecting if an entry were a directory and if so then descending:

<!-- language: java -->
     for (FileStatus globStat: matches) {
218          if (globStat.isDir()) {
219            for(FileStatus stat: fs.listStatus(globStat.getPath(),
220                inputFilter)) {
221              result.add(stat);
222            }          
223          } else {
224            result.add(globStat);
225          }
226        }

However when invoking sc.textFile there are errors on directory entries: "not a file". This behavior is confusing - given the proper support appears to be in place for handling directories.

like image 387
WestCoastProjects Avatar asked Mar 02 '15 19:03

WestCoastProjects


2 Answers

I was looking at an old version of FileInputFormat..

BEFORE setting the recursive config mapreduce.input.fileinputformat.input.dir.recursive

scala> sc.textFile("dev/*").count
     java.io.IOException: Not a file: file:/shared/sparkup/dev/audit-release/blank_maven_build

The default is null/not set which is evaluated as "false":

scala> sc.hadoopConfiguration.get("mapreduce.input.fileinputformat.input.dir.recursive")
res1: String = null

AFTER:

Now set the value :

sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true")

Now retry the recursive operation:

scala>sc.textFile("dev/*/*").count

..
res5: Long = 3481

So it works.

Update added / for full recursion per comment by @Ben

like image 62
WestCoastProjects Avatar answered Nov 10 '22 07:11

WestCoastProjects


I have found that these parameters must be set in the following way:

.set("spark.hive.mapred.supports.subdirectories","true")
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true")
like image 30
Paul Avatar answered Nov 10 '22 09:11

Paul