Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Hadoop mapreduce : Driver for chaining mappers within a MapReduce job

Tags:

java

hadoop

I have mapreduce job: my code Map class:

public static class MapClass extends Mapper<Text, Text, Text, LongWritable> {

    @Override
    public void map(Text key, Text value, Context context)
        throws IOException, InterruptedException {
    }
}

And I want to use ChainMapper :

1. Job job = new Job(conf, "Job with chained tasks");
2. job.setJarByClass(MapReduce.class);
3. job.setInputFormatClass(TextInputFormat.class);
4. job.setOutputFormatClass(TextOutputFormat.class);

5. FileInputFormat.setInputPaths(job, new Path(InputFile));
6. FileOutputFormat.setOutputPath(job, new Path(OutputFile));

7. JobConf map1 = new JobConf(false);

8. ChainMapper.addMapper(
        job, 
        MapClass.class, 
        Text.class, 
        Text.class, 
        Text.class, 
        Text.class, 
        true, 
        map1
        ); 

but its report has an error at line 8 :

Multiple markers at this line - Occurrence of 'addMapper' - The method addMapper(JobConf, Class>, Class, Class, Class, Class, boolean, JobConf) in the type ChainMapper is not applicable for the arguments (Job, Class, Class, Class, Class, Class, boolean, Configuration) - Debug Current Instruction Pointer - The method addMapper(JobConf, Class>, Class, Class, Class, Class, boolean, JobConf) in the type ChainMapper is not applicable for the arguments (JobConf, Class, Class, Class, Class, Class, boolean, JobConf)

like image 446
user864846 Avatar asked Jul 27 '11 07:07

user864846


People also ask

What is chain mapper in Hadoop?

The ChainMapper class allows to use multiple Mapper classes within a single Map task. The Mapper classes are invoked in a chained (or piped) fashion, the output of the first becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task's output.

What is the preferred file format when chaining multiple MapReduce jobs?

MapReduce is a computation abstraction that works well with The Hadoop Distributed File System (HDFS).

What is the role of driver code in MapReduce?

The Intermediate output generated from the mapper is fed to the reducer which processes it and generates the final output which is then saved in the HDFS. The major component in a MapReduce job is a Driver Class. It is responsible for setting up a MapReduce Job to run-in Hadoop.

Can MapReduce customize number of mappers in driver class?

Yes number of Mappers can be changed in MapReduce job. There can be 100 or 1000 of mappers running parallelly on every slave and it directly depends upon slave configuration or on machine configuration on which the slave is running and these all slaves would be writing output on local disk.


1 Answers

After a lot of "Kung Fu", I was able to use ChainMapper/ChainReducer. Thanks for last comment user864846.

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package myPKG;

/* 
 * Ajitsen: Sample program for ChainMapper/ChainReducer. This program is modified version of WordCount example available in Hadoop-0.18.0. Added ChainMapper/ChainReducer and made to works in Hadoop 1.0.2. 
 */

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.ChainMapper;
import org.apache.hadoop.mapred.lib.ChainReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ChainWordCount extends Configured implements Tool {

    public static class Tokenizer extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(LongWritable key, Text value, 
                OutputCollector<Text, IntWritable> output, 
                Reporter reporter) throws IOException {
            String line = value.toString();
            System.out.println("Line:"+line);
            StringTokenizer itr = new StringTokenizer(line);
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                output.collect(word, one);
            }
        }
    }

    public static class UpperCaser extends MapReduceBase
    implements Mapper<Text, IntWritable, Text, IntWritable> {

        public void map(Text key, IntWritable value, 
                OutputCollector<Text, IntWritable> output, 
                Reporter reporter) throws IOException {
            String word = key.toString().toUpperCase();
            System.out.println("Upper Case:"+word);
            output.collect(new Text(word), value);    
        }
    }

    public static class Reduce extends MapReduceBase
    implements Reducer<Text, IntWritable, Text, IntWritable> {

        public void reduce(Text key, Iterator<IntWritable> values,
                OutputCollector<Text, IntWritable> output, 
                Reporter reporter) throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }
            System.out.println("Word:"+key.toString()+"\tCount:"+sum);
            output.collect(key, new IntWritable(sum));
        }
    }

    static int printUsage() {
        System.out.println("wordcount <input> <output>");
        ToolRunner.printGenericCommandUsage(System.out);
        return -1;
    }

    public int run(String[] args) throws Exception {
        JobConf conf = new JobConf(getConf(), ChainWordCount.class);
        conf.setJobName("wordcount");

        if (args.length != 2) {
            System.out.println("ERROR: Wrong number of parameters: " +
                    args.length + " instead of 2.");
            return printUsage();
        }
        FileInputFormat.setInputPaths(conf, args[0]);
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        JobConf mapAConf = new JobConf(false);
        ChainMapper.addMapper(conf, Tokenizer.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, mapAConf);

        JobConf mapBConf = new JobConf(false);
        ChainMapper.addMapper(conf, UpperCaser.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, mapBConf);

        JobConf reduceConf = new JobConf(false);
        ChainReducer.setReducer(conf, Reduce.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf);

        JobClient.runJob(conf);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new ChainWordCount(), args);
        System.exit(res);
    }
}

EDIT in latest version (at least from hadoop 2.6), the true flag in addMapper is not needed. (in fact the signature has change suppression it`).

So it would be just

JobConf mapAConf = new JobConf(false);
ChainMapper.addMapper(conf, Tokenizer.class, LongWritable.class, Text.class,
                      Text.class, IntWritable.class, mapAConf);
like image 160
Ajitsen Avatar answered Oct 12 '22 21:10

Ajitsen