Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Computing set intersection and set difference of the records of two files with hadoop

Sorry for cross-posting this on the hadoop user mailing list and here, but this is getting an urgent matter for me.

My problem is as follows: I have two input files, and I want to determine

  • a) The number of lines which only occur in file 1
  • b) The number of lines which only occur in file 2
  • c) The number of lines common to both (e.g. in regard to string equality)

Example:

File 1:
a
b
c

File 2:
a
d

Desired output for each case:

lines_only_in_1: 2         (b, c)
lines_only_in_2: 1         (d)
lines_in_both:   1         (a)

Basically my approach is as follows: I wrote my own LineRecordReader, so that the mapper receives a pair consisting of the line (text) and a byte indicating the source file (either 0 or 1). The mapper only returns the pair again so actually it does nothing. However, the side effect is, that the combiner receives a

Map<Line, Iterable<SourceId>>

(where SourceId is either 0 or 1).

Now, for each line I can get the set of sources it appears in. Therefore, I could write a combiner that counts for each case (a, b, c) the number of lines (Listing 1)

The combiner then outputs a 'summary' only on cleanup (is that safe?). So this summary looks like:

lines_only_in_1   2531
lines_only_in_2   3190
lines_in_both      901

In the reducer I then only sum up the values for these summaries. (So the output of the reducer looks just as that of the combiner).

However, the main problem is, that I need to treat both source files as a single virtual file which yield records of the form (line, sourceId) // sourceId either 0 or 1

And I am not sure how to achieve that. So the question is whether I can avoid preprocessing and merging the files beforehand, and do that on-the-fly with a something like a virtually-merged-file-reader and custom record reader. Any code example is much appreciated.

Best regards, Claus

Listing 1:

public static class SourceCombiner
    extends Reducer<Text, ByteWritable, Text, LongWritable> {

    private long countA = 0;
    private long countB = 0;
    private long countC = 0; // C = lines (c)ommon to both sources

    @Override
    public void reduce(Text key, Iterable<ByteWritable> values, Context context) throws IOException, InterruptedException {
        Set<Byte> fileIds = new HashSet<Byte>();
        for (ByteWritable val : values) {
            byte fileId = val.get();

            fileIds.add(fileId);
        }

        if(fileIds.contains((byte)0)) { ++countA; }
        if(fileIds.contains((byte)1)) { ++countB; }
        if(fileIds.size() >= 2) { ++countC; }
    }

    protected void cleanup(Context context)
            throws java.io.IOException, java.lang.InterruptedException
    {
        context.write(new Text("in_a_distinct_count_total"), new LongWritable(countA));
        context.write(new Text("in_b_distinct_count_total"), new LongWritable(countB));
        context.write(new Text("out_common_distinct_count_total"), new LongWritable(countC));
    }
}
like image 748
raven_arkadon Avatar asked Nov 05 '22 19:11

raven_arkadon


1 Answers

Okay, I must admit that I didn't really catch the gist of what you've tried so far, but I have a simple approach to do the stuff you may need.

Have a look at the filemapper. This one is going to get the filename and submit it with each line of the input.

    public class FileMapper extends Mapper<LongWritable, Text, Text, Text> {

        static Text fileName;

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            context.write(value, fileName);
        }

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {

            String name = ((FileSplit) context.getInputSplit()).getPath().getName();
            fileName = new Text(name);
        }
    }

Now we have a bunch of key / values that look like this (in regard to your example)

    a File 1
    b File 1
    c File 1

    a File 2
    d File 2

Obviously reducing them will get you an input like this:

    a File 1,File 2
    b File 1
    c File 1
    d File 2

What you need to do in your reducer could look like this:

public class FileReducer extends Reducer<Text, Text, Text, Text> {

    enum Counter {
        LINES_IN_COMMON, LINES_IN_FIRST, LINES_IN_SECOND
    }

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        HashSet<String> set = new HashSet<String>();
        for (Text t : values) {
            set.add(t.toString());
        }

        // if we have only two files and we have just two records in our hashset
        // the line is contained in both files
        if (set.size() == 2) {
            context.getCounter(Counter.LINES_IN_COMMON).increment(1);
        } else {
            // sorry this is a bit dirty...
            String t = set.iterator().next();
            // determine which file it was by checking for the name:
            if(t.toString().equals("YOUR_FIRST_FILE_NAME")){
                context.getCounter(Counter.LINES_IN_FIRST).increment(1);
            } else {
                context.getCounter(Counter.LINES_IN_SECOND).increment(1);
            }
        }
    }

}

You have to replace the string inside the if statement to your filenames.

I think that using the job counter is a bit clearer than using own primitives and writing them to context in cleanup. You can retrieve the counters for a job by calling this stuff after the completion:

Job job = new Job(new Configuration());
//setup stuff etc omitted..
job.waitForCompletion(true);
// do the same line with the other enums
long linesInCommon = job.getCounters().findCounter(Counter.LINES_IN_COMMON).getValue();

Never the less, if you need the numbers of the lines in common etc in your HDFS, then go for your solution.

Hope that helped you.

like image 159
Thomas Jungblut Avatar answered Nov 09 '22 07:11

Thomas Jungblut