Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get files name with spark sc.textFile?

I am reading a directory of files using the following code:

val data = sc.textFile("/mySource/dir1/*")

now my data rdd contains all rows of all files in the directory (right?)

I want now to add a column to each row with the source files name, how can I do that?

The other options I tried is using wholeTextFile but I keep getting out of memory exceptions. 5 servers 24 cores 24 GB (executor-core 5 executor-memory 5G) any ideas?

like image 262
Eran Witkon Avatar asked Dec 16 '15 15:12

Eran Witkon


People also ask

What does SC textFile return?

textFile is a method of a org. apache. spark. SparkContext class that reads a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.

How do I access local files in Spark?

To access the file in Spark jobs, use SparkFiles. get(fileName) to find its download location. A directory can be given if the recursive option is set to true. Currently directories are only supported for Hadoop-supported filesystems.


1 Answers

You can use this code. I have tested it with Spark 1.4 and 1.5.

It gets the file name from the inputSplit and adds it to each line using the iterator using the mapPartitionsWithInputSplit of the NewHadoopRDD

import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat}
import org.apache.spark.rdd.{NewHadoopRDD}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text

val sc = new SparkContext(new SparkConf().setMaster("local"))

val fc = classOf[TextInputFormat]
val kc = classOf[LongWritable]
val vc = classOf[Text]

val path :String = "file:///home/user/test"
val text = sc.newAPIHadoopFile(path, fc ,kc, vc, sc.hadoopConfiguration)

val linesWithFileNames = text.asInstanceOf[NewHadoopRDD[LongWritable, Text]]
           .mapPartitionsWithInputSplit((inputSplit, iterator) => {
  val file = inputSplit.asInstanceOf[FileSplit]
  iterator.map(tup => (file.getPath, tup._2))
  }
)

linesWithFileNames.foreach(println)
like image 174
Udy Avatar answered Oct 07 '22 18:10

Udy