Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to make Spark session read all the files recursively?

Displaying the directories under which JSON files are stored:

$ tree -d try/
try/
├── 10thOct_logs1
├── 11thOct
│   └── logs2
└── Oct
    └── 12th
        └── logs3

Task is to read all logs using SparkSession.

Is there an elegant way to read through all the files in directories and then sub-directories recursively?

Few commands that I tried are prone to cause unintentional exclusion.

spark.read.json("file:///var/foo/try/<exp>")

+----------+---+-----+-------+
| <exp> -> | * | */* | */*/* |
+----------+---+-----+-------+
| logs1    | y | y   | n     |
| logs2    | n | y   | y     |
| logs3    | n | n   | y     |
+----------+---+-----+-------+

You can see in the above table that none of the three expressions matches all the directories (located at 3 different depths) at the same time. Frankly speaking, I wasn't expecting the exclusion of 10thOct_logs1 while using the third expression */*/*.

This makes me conclude that whatever files or directories path match against the expression following last / is considered as an exact match, and everything else is ignored.

like image 710
Saurav Sahu Avatar asked Dec 30 '19 10:12

Saurav Sahu


People also ask

How does spark read multiple files?

Spark core provides textFile() & wholeTextFiles() methods in SparkContext class which is used to read single and multiple text or csv files into a single Spark RDD. Using this method we can also read all files from a directory and files with a specific pattern.

How can I read all files in a directory using PySpark?

If we have a folder folder having all . txt files, we can read them all using sc. textFile("folder/*. txt") .

How do I list all files recursively?

Linux recursive directory listing using ls -R command. The -R option passed to the ls command to list subdirectories recursively.


1 Answers

Update

A new option was introduced in Spark 3 to read from nested folder recursiveFileLookup :

spark.read.option("recursiveFileLookup", "true").json("file:///var/foo/try")

For older versions, alternatively, you can use Hadoop listFiles to list recursively all the file paths and then pass them to Spark read:

import org.apache.hadoop.fs.{Path}

val conf = sc.hadoopConfiguration

// get all file paths
val fromFolder = new Path("file:///var/foo/try/")
val logfiles = fromFolder.getFileSystem(conf).listFiles(fromFolder, true)
var files = Seq[String]()
while (logfiles.hasNext) {
       // one can filter here some specific files
       files = files :+ logfiles.next().getPath().toString
}

// read multiple paths
val df = spark.read.csv(files: _*)

df.select(input_file_name()).distinct().show(false)


+-------------------------------------+
|input_file_name()                    |
+-------------------------------------+
|file:///var/foo/try/11thOct/log2.csv |
|file:///var/foo/try/10thOct_logs1.csv|
|file:///var/foo/try/Oct/12th/log3.csv|
+-------------------------------------+
like image 128
blackbishop Avatar answered Nov 01 '22 12:11

blackbishop