Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Use Spark to list all files in a Hadoop HDFS directory?

I want to loop through all text files in a Hadoop dir and count all the occurrences of the word "error". Is there a way to do a hadoop fs -ls /users/ubuntu/ to list all the files in a dir with the Apache Spark Scala API?

From the given first example, the spark context seems to only access files individually through something like:

val file = spark.textFile("hdfs://target_load_file.txt")

In my problem, I do not know how many nor the names of the files in the HDFS folder beforehand. Looked at the spark context docs but couldn't find this kind of functionality.

like image 883
poliu2s Avatar asked Apr 28 '14 22:04

poliu2s


People also ask

How do I list files from HDFS path?

Usage: hadoop fs -ls [-d] [-h] [-R] [-t] [-S] [-r] [-u] <args> Options: -d: Directories are listed as plain files. -h: Format file sizes in a human-readable fashion (eg 64.0m instead of 67108864). -R: Recursively list subdirectories encountered. -t: Sort output by modification time (most recent first).

How do I list all files in HDFS?

Use the hdfs dfs -ls command to list files in Hadoop archives. Run the hdfs dfs -ls command by specifying the archive directory location. Note that the modified parent argument causes the files to be archived relative to /user/ .

How can I read all files in a directory using Pyspark?

If we have a folder folder having all . txt files, we can read them all using sc. textFile("folder/*. txt") .


2 Answers

You can use a wildcard:

val errorCount = sc.textFile("hdfs://some-directory/*")
                   .flatMap(_.split(" ")).filter(_ == "error").count
like image 81
Daniel Darabos Avatar answered Sep 24 '22 20:09

Daniel Darabos


import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import scala.collection.mutable.Stack


val fs = FileSystem.get( sc.hadoopConfiguration )
var dirs = Stack[String]()
val files = scala.collection.mutable.ListBuffer.empty[String]
val fs = FileSystem.get(sc.hadoopConfiguration)

dirs.push("/user/username/")

while(!dirs.isEmpty){
    val status = fs.listStatus(new Path(dirs.pop()))
    status.foreach(x=> if(x.isDirectory) dirs.push(x.getPath.toString) else 
    files+= x.getPath.toString)
}
files.foreach(println)
like image 28
Animesh Raj Jha Avatar answered Sep 25 '22 20:09

Animesh Raj Jha