I am reading a directory of files using the following code:
val data = sc.textFile("/mySource/dir1/*")
now my data
rdd contains all rows of all files in the directory (right?)
I want now to add a column to each row with the source files name, how can I do that?
The other options I tried is using wholeTextFile but I keep getting out of memory exceptions. 5 servers 24 cores 24 GB (executor-core 5 executor-memory 5G) any ideas?
textFile is a method of a org. apache. spark. SparkContext class that reads a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.
To access the file in Spark jobs, use SparkFiles. get(fileName) to find its download location. A directory can be given if the recursive option is set to true. Currently directories are only supported for Hadoop-supported filesystems.
You can use this code. I have tested it with Spark 1.4 and 1.5.
It gets the file name from the inputSplit
and adds it to each line using the iterator
using the mapPartitionsWithInputSplit
of the NewHadoopRDD
import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat}
import org.apache.spark.rdd.{NewHadoopRDD}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
val sc = new SparkContext(new SparkConf().setMaster("local"))
val fc = classOf[TextInputFormat]
val kc = classOf[LongWritable]
val vc = classOf[Text]
val path :String = "file:///home/user/test"
val text = sc.newAPIHadoopFile(path, fc ,kc, vc, sc.hadoopConfiguration)
val linesWithFileNames = text.asInstanceOf[NewHadoopRDD[LongWritable, Text]]
.mapPartitionsWithInputSplit((inputSplit, iterator) => {
val file = inputSplit.asInstanceOf[FileSplit]
iterator.map(tup => (file.getPath, tup._2))
}
)
linesWithFileNames.foreach(println)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With