Displaying the directories under which JSON files are stored:
$ tree -d try/
try/
├── 10thOct_logs1
├── 11thOct
│ └── logs2
└── Oct
└── 12th
└── logs3
Task is to read all logs using SparkSession
.
Is there an elegant way to read through all the files in directories and then sub-directories recursively?
Few commands that I tried are prone to cause unintentional exclusion.
spark.read.json("file:///var/foo/try/<exp>")
+----------+---+-----+-------+
| <exp> -> | * | */* | */*/* |
+----------+---+-----+-------+
| logs1 | y | y | n |
| logs2 | n | y | y |
| logs3 | n | n | y |
+----------+---+-----+-------+
You can see in the above table that none of the three expressions matches all the directories (located at 3 different depths) at the same time. Frankly speaking, I wasn't expecting the exclusion of 10thOct_logs1
while using the third expression */*/*
.
This makes me conclude that whatever files or directories path match against the expression following last /
is considered as an exact match, and everything else is ignored.
Spark core provides textFile() & wholeTextFiles() methods in SparkContext class which is used to read single and multiple text or csv files into a single Spark RDD. Using this method we can also read all files from a directory and files with a specific pattern.
If we have a folder folder having all . txt files, we can read them all using sc. textFile("folder/*. txt") .
Linux recursive directory listing using ls -R command. The -R option passed to the ls command to list subdirectories recursively.
A new option was introduced in Spark 3 to read from nested folder recursiveFileLookup
:
spark.read.option("recursiveFileLookup", "true").json("file:///var/foo/try")
For older versions, alternatively, you can use Hadoop listFiles
to list recursively all the file paths and then pass them to Spark read:
import org.apache.hadoop.fs.{Path}
val conf = sc.hadoopConfiguration
// get all file paths
val fromFolder = new Path("file:///var/foo/try/")
val logfiles = fromFolder.getFileSystem(conf).listFiles(fromFolder, true)
var files = Seq[String]()
while (logfiles.hasNext) {
// one can filter here some specific files
files = files :+ logfiles.next().getPath().toString
}
// read multiple paths
val df = spark.read.csv(files: _*)
df.select(input_file_name()).distinct().show(false)
+-------------------------------------+
|input_file_name() |
+-------------------------------------+
|file:///var/foo/try/11thOct/log2.csv |
|file:///var/foo/try/10thOct_logs1.csv|
|file:///var/foo/try/Oct/12th/log3.csv|
+-------------------------------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With