Sorry for cross-posting this on the hadoop user mailing list and here, but this is getting an urgent matter for me.
My problem is as follows: I have two input files, and I want to determine
Example:
File 1:
a
b
c
File 2:
a
d
Desired output for each case:
lines_only_in_1: 2 (b, c)
lines_only_in_2: 1 (d)
lines_in_both: 1 (a)
Basically my approach is as follows: I wrote my own LineRecordReader, so that the mapper receives a pair consisting of the line (text) and a byte indicating the source file (either 0 or 1). The mapper only returns the pair again so actually it does nothing. However, the side effect is, that the combiner receives a
Map<Line, Iterable<SourceId>>
(where SourceId is either 0 or 1).
Now, for each line I can get the set of sources it appears in. Therefore, I could write a combiner that counts for each case (a, b, c) the number of lines (Listing 1)
The combiner then outputs a 'summary' only on cleanup (is that safe?). So this summary looks like:
lines_only_in_1 2531
lines_only_in_2 3190
lines_in_both 901
In the reducer I then only sum up the values for these summaries. (So the output of the reducer looks just as that of the combiner).
However, the main problem is, that I need to treat both source files as a single virtual file which yield records of the form (line, sourceId) // sourceId either 0 or 1
And I am not sure how to achieve that. So the question is whether I can avoid preprocessing and merging the files beforehand, and do that on-the-fly with a something like a virtually-merged-file-reader and custom record reader. Any code example is much appreciated.
Best regards, Claus
Listing 1:
public static class SourceCombiner
extends Reducer<Text, ByteWritable, Text, LongWritable> {
private long countA = 0;
private long countB = 0;
private long countC = 0; // C = lines (c)ommon to both sources
@Override
public void reduce(Text key, Iterable<ByteWritable> values, Context context) throws IOException, InterruptedException {
Set<Byte> fileIds = new HashSet<Byte>();
for (ByteWritable val : values) {
byte fileId = val.get();
fileIds.add(fileId);
}
if(fileIds.contains((byte)0)) { ++countA; }
if(fileIds.contains((byte)1)) { ++countB; }
if(fileIds.size() >= 2) { ++countC; }
}
protected void cleanup(Context context)
throws java.io.IOException, java.lang.InterruptedException
{
context.write(new Text("in_a_distinct_count_total"), new LongWritable(countA));
context.write(new Text("in_b_distinct_count_total"), new LongWritable(countB));
context.write(new Text("out_common_distinct_count_total"), new LongWritable(countC));
}
}
Okay, I must admit that I didn't really catch the gist of what you've tried so far, but I have a simple approach to do the stuff you may need.
Have a look at the filemapper. This one is going to get the filename and submit it with each line of the input.
public class FileMapper extends Mapper<LongWritable, Text, Text, Text> {
static Text fileName;
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(value, fileName);
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
String name = ((FileSplit) context.getInputSplit()).getPath().getName();
fileName = new Text(name);
}
}
Now we have a bunch of key / values that look like this (in regard to your example)
a File 1
b File 1
c File 1
a File 2
d File 2
Obviously reducing them will get you an input like this:
a File 1,File 2
b File 1
c File 1
d File 2
What you need to do in your reducer could look like this:
public class FileReducer extends Reducer<Text, Text, Text, Text> {
enum Counter {
LINES_IN_COMMON, LINES_IN_FIRST, LINES_IN_SECOND
}
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
HashSet<String> set = new HashSet<String>();
for (Text t : values) {
set.add(t.toString());
}
// if we have only two files and we have just two records in our hashset
// the line is contained in both files
if (set.size() == 2) {
context.getCounter(Counter.LINES_IN_COMMON).increment(1);
} else {
// sorry this is a bit dirty...
String t = set.iterator().next();
// determine which file it was by checking for the name:
if(t.toString().equals("YOUR_FIRST_FILE_NAME")){
context.getCounter(Counter.LINES_IN_FIRST).increment(1);
} else {
context.getCounter(Counter.LINES_IN_SECOND).increment(1);
}
}
}
}
You have to replace the string inside the if statement to your filenames.
I think that using the job counter is a bit clearer than using own primitives and writing them to context in cleanup. You can retrieve the counters for a job by calling this stuff after the completion:
Job job = new Job(new Configuration());
//setup stuff etc omitted..
job.waitForCompletion(true);
// do the same line with the other enums
long linesInCommon = job.getCounters().findCounter(Counter.LINES_IN_COMMON).getValue();
Never the less, if you need the numbers of the lines in common etc in your HDFS, then go for your solution.
Hope that helped you.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With