I am loading multiple files into a JavaRDD using
JavaRDD<String> allLines = sc.textFile(hdfs://path/*.csv);
After loading the files I modify each record and want to save them. However I need to also save the original file name (ID) with the record for future reference. Is there anyway that I can get the original file name from the individual record in RDD? thanks
You can try to do something like in the following snippet:
JavaPairRDD<LongWritable, Text> javaPairRDD = sc.newAPIHadoopFile(
"hdfs://path/*.csv",
TextInputFormat.class,
LongWritable.class,
Text.class,
new Configuration()
);
JavaNewHadoopRDD<LongWritable, Text> hadoopRDD = (JavaNewHadoopRDD) javaPairRDD;
JavaRDD<Tuple2<String, String>> namedLinesRDD = hadoopRDD.mapPartitionsWithInputSplit((inputSplit, lines) -> {
FileSplit fileSplit = (FileSplit) inputSplit;
String fileName = fileSplit.getPath().getName();
Stream<Tuple2<String, String>> stream =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(lines, Spliterator.ORDERED), false)
.map(line -> {
String lineText = line._2().toString();
// emit file name as key and line as a value
return new Tuple2(fileName, lineText);
});
return stream.iterator();
}, true);
JavaRDD<Tuple2<String, String>> namedLinesRDD = hadoopRDD.mapPartitionsWithInputSplit(
new Function2<InputSplit, Iterator<Tuple2<LongWritable, Text>>, Iterator<Tuple2<String, String>>>() {
@Override
public Iterator<Tuple2<String, String>> call(InputSplit inputSplit, final Iterator<Tuple2<LongWritable, Text>> lines) throws Exception {
FileSplit fileSplit = (FileSplit) inputSplit;
final String fileName = fileSplit.getPath().getName();
return new Iterator<Tuple2<String, String>>() {
@Override
public boolean hasNext() {
return lines.hasNext();
}
@Override
public Tuple2<String, String> next() {
Tuple2<LongWritable, Text> entry = lines.next();
return new Tuple2<String, String>(fileName, entry._2().toString());
}
};
}
},
true
);
You want spark's wholeTextFiles function. From the documentation:
For example, if you have the following files:
hdfs://a-hdfs-path/part-00000
hdfs://a-hdfs-path/part-00001
...
hdfs://a-hdfs-path/part-nnnnn
Do val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path"),
then rdd contains
(a-hdfs-path/part-00000, its content)
(a-hdfs-path/part-00001, its content)
...
(a-hdfs-path/part-nnnnn, its content)
It returns you an RDD of tuples where the left is the filename and the right is the content.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With