I need to access the counters from my mapper in my reducer. Is this possible? If so how is it done?
As an example: my mapper is:
public class CounterMapper extends Mapper<Text,Text,Text,Text> {
static enum TestCounters { TEST }
@Override
protected void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
context.getCounter(TestCounters.TEST).increment(1);
context.write(key, value);
}
}
My reducer is
public class CounterReducer extends Reducer<Text,Text,Text,LongWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Counter counter = context.getCounter(CounterMapper.TestCounters.TEST);
long counterValue = counter.getValue();
context.write(key, new LongWritable(counterValue));
}
}
counterValue is always 0. Am I doing something wrong or is this just not possible?
In Hadoop, Reducer takes the output of the Mapper (intermediate key-value pair) process each of them to generate the output. The output of the reducer is the final output, which is stored in HDFS. Usually, in the Hadoop Reducer, we do aggregation or summation sort of computation.
Reducer is input the grouped output of a Mapper . In the phase the framework, for each Reducer , fetches the relevant partition of the output of all the Mapper s, via HTTP.
Implemented Jeff G's solution on the new API:
@Override
public void setup(Context context) throws IOException, InterruptedException{
Configuration conf = context.getConfiguration();
Cluster cluster = new Cluster(conf);
Job currentJob = cluster.getJob(context.getJobID());
mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME).getValue();
}
In the Reducer's configure(JobConf), you can use the JobConf object to look up the reducer's own job id. With that, your reducer can create its own JobClient -- i.e. a connection to the jobtracker -- and query the counters for this job (or any job for that matter).
// in the Reducer class...
private long mapperCounter;
@Override
public void configure(JobConf conf) {
JobClient client = new JobClient(conf);
RunningJob parentJob =
client.getJob(JobID.forName( conf.get("mapred.job.id") ));
mapperCounter = parentJob.getCounters().getCounter(MAP_COUNTER_NAME);
}
Now you can use mapperCounter inside the reduce() method itself.
You actually need a try-catch here. I'm using the old API, but it shouldn't be hard to adapt for the new API.
Note that mappers' counters should all be finalized before any reducer starts, so contrary to Justin Thomas's comment, I believe you should get accurate values (as long as the reducers aren't incrementing the same counter!)
The whole point of map/reduce is to parallelize the jobs. There will be many unique mappers/reducers so the value wouldn't be correct anyway except for that run of the map/reduce pair.
They have a word count example:
http://wiki.apache.org/hadoop/WordCount
You could change the context.write(word,one) to context.write(line,one)
The global counter values are never broadcast back to each mapper or reducer. If you want the # of mapper records to be available to the reducer, you'll need to rely on some external mechanism to do this.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With