Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to Get the file name for record in spark RDD (JavaRDD)

I am loading multiple files into a JavaRDD using

JavaRDD<String> allLines = sc.textFile(hdfs://path/*.csv);

After loading the files I modify each record and want to save them. However I need to also save the original file name (ID) with the record for future reference. Is there anyway that I can get the original file name from the individual record in RDD? thanks

like image 599
Asha Avatar asked Sep 08 '15 19:09

Asha


2 Answers

You can try to do something like in the following snippet:

JavaPairRDD<LongWritable, Text> javaPairRDD = sc.newAPIHadoopFile(
    "hdfs://path/*.csv", 
    TextInputFormat.class, 
    LongWritable.class, 
    Text.class, 
    new Configuration()
);
JavaNewHadoopRDD<LongWritable, Text> hadoopRDD = (JavaNewHadoopRDD) javaPairRDD;

JavaRDD<Tuple2<String, String>> namedLinesRDD = hadoopRDD.mapPartitionsWithInputSplit((inputSplit, lines) -> {
    FileSplit fileSplit = (FileSplit) inputSplit;
    String fileName = fileSplit.getPath().getName();

    Stream<Tuple2<String, String>> stream =
        StreamSupport.stream(Spliterators.spliteratorUnknownSize(lines, Spliterator.ORDERED), false)
            .map(line -> {
                String lineText = line._2().toString();
                // emit file name as key and line as a value
                return new Tuple2(fileName, lineText);
            });
    return stream.iterator();
}, true);

Update (for java7)

JavaRDD<Tuple2<String, String>> namedLinesRDD = hadoopRDD.mapPartitionsWithInputSplit(
    new Function2<InputSplit, Iterator<Tuple2<LongWritable, Text>>, Iterator<Tuple2<String, String>>>() {
        @Override
        public Iterator<Tuple2<String, String>> call(InputSplit inputSplit, final Iterator<Tuple2<LongWritable, Text>> lines) throws Exception {
            FileSplit fileSplit = (FileSplit) inputSplit;
            final String fileName = fileSplit.getPath().getName();
            return new Iterator<Tuple2<String, String>>() {
                @Override
                public boolean hasNext() {
                    return lines.hasNext();
                }
                @Override
                public Tuple2<String, String> next() {
                    Tuple2<LongWritable, Text> entry = lines.next();
                    return new Tuple2<String, String>(fileName, entry._2().toString());
                }
            };
        }
    }, 
    true
);
like image 58
szhem Avatar answered Oct 05 '22 10:10

szhem


You want spark's wholeTextFiles function. From the documentation:

For example, if you have the following files:

   hdfs://a-hdfs-path/part-00000
   hdfs://a-hdfs-path/part-00001
   ...
   hdfs://a-hdfs-path/part-nnnnn

Do val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path"),

then rdd contains

   (a-hdfs-path/part-00000, its content)
   (a-hdfs-path/part-00001, its content)
   ...
   (a-hdfs-path/part-nnnnn, its content)

It returns you an RDD of tuples where the left is the filename and the right is the content.

like image 25
dpeacock Avatar answered Oct 05 '22 12:10

dpeacock