Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can one list all csv files in an HDFS location within the Spark Scala shell?

The purpose of this is in order to manipulate and save a copy of each data file in a second location in HDFS. I will be using

RddName.coalesce(1).saveAsTextFile(pathName)

to save the result to HDFS.

This is why I want to do each file separately even though I am sure the performance will not be as efficient. However, I have yet to determine how to store the list of CSV file paths into an array of strings and then loop through each one with a separate RDD.

Let us use the following anonymous example as the HDFS source locations:

/data/email/click/date=2015-01-01/sent_20150101.csv
/data/email/click/date=2015-01-02/sent_20150102.csv
/data/email/click/date=2015-01-03/sent_20150103.csv

I know how to list the file paths using Hadoop FS Shell:

HDFS DFS -ls /data/email/click/*/*.csv

I know how to create one RDD for all the data:

val sentRdd = sc.textFile( "/data/email/click/*/*.csv" )
like image 407
Jaime Avatar asked Sep 24 '15 21:09

Jaime


2 Answers

I haven't tested it thoroughly but something like this seems to work:

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.hadoop.fs.{FileSystem, Path, LocatedFileStatus, RemoteIterator}
import java.net.URI

val path: String = ???

val hconf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
val hdfs = FileSystem.get(hconf)
val iter = hdfs.listFiles(new Path(path), false)

def listFiles(iter: RemoteIterator[LocatedFileStatus]) = {
  def go(iter: RemoteIterator[LocatedFileStatus], acc: List[URI]): List[URI] = {
    if (iter.hasNext) {
      val uri = iter.next.getPath.toUri
      go(iter, uri :: acc)
    } else {
      acc
    }
  }
  go(iter, List.empty[java.net.URI])
}

listFiles(iter).filter(_.toString.endsWith(".csv"))
like image 115
zero323 Avatar answered Nov 16 '22 00:11

zero323


This is what ultimately worked for me:

import org.apache.hadoop.fs._
import org.apache.spark.deploy.SparkHadoopUtil
import java.net.URI

val hdfs_conf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
val hdfs = FileSystem.get(hdfs_conf)
// source data in HDFS
val sourcePath = new Path("/<source_location>/<filename_pattern>")

hdfs.globStatus( sourcePath ).foreach{ fileStatus =>
   val filePathName = fileStatus.getPath().toString()
   val fileName = fileStatus.getPath().getName()

   // < DO STUFF HERE>

} // end foreach loop
like image 1
Jaime Avatar answered Nov 16 '22 00:11

Jaime