Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Map-reduce job giving ClassNotFound exception even though mapper is present when running with yarn?

I am running a hadoop job which is working fine when I am running it without yarn in pseudo-distributed mode, but it is giving me class not found exception when running with yarn

16/03/24 01:43:40 INFO mapreduce.Job: Task Id : attempt_1458775953882_0002_m_000003_1, Status : FAILED
Error: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.hadoop.keyword.count.ItemMapper not found
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
    at org.apache.hadoop.mapreduce.task.JobContextImpl.getMapperClass(JobContextImpl.java:186)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:745)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.ClassNotFoundException: Class com.hadoop.keyword.count.ItemMapper not found
    at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
    ... 8 more

Here is the source-code for the job

Configuration conf = new Configuration();
conf.set("keywords", args[2]);

Job job = Job.getInstance(conf, "item count");
job.setJarByClass(ItemImpl.class);
job.setMapperClass(ItemMapper.class);
job.setReducerClass(ItemReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);

Here is the command I am running

hadoop jar ~/itemcount.jar /user/rohit/tweets /home/rohit/outputs/23mar-yarn13 vodka,wine,whisky

Edit Code, after suggestion

package com.hadoop.keyword.count;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;

public class ItemImpl {

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        conf.set("keywords", args[2]);

        Job job = Job.getInstance(conf, "item count");
        job.setJarByClass(ItemImpl.class);
        job.setMapperClass(ItemMapper.class);
        job.setReducerClass(ItemReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }


    public static class ItemMapper extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        JSONParser parser = new JSONParser();

        @Override
        public void map(Object key, Text value, Context output) throws IOException,
                InterruptedException {

            JSONObject tweetObject = null;

            String[] keywords = this.getKeyWords(output);

            try {
                tweetObject = (JSONObject) parser.parse(value.toString());
            } catch (ParseException e) {
                e.printStackTrace();
            }
            if (tweetObject != null) {
                String tweetText = (String) tweetObject.get("text");

                if(tweetText == null){
                    return;
                }

                tweetText = tweetText.toLowerCase();
    /*          StringTokenizer st = new StringTokenizer(tweetText);

                ArrayList<String> tokens = new ArrayList<String>();

                while (st.hasMoreTokens()) {
                    tokens.add(st.nextToken());
                }*/

                for (String keyword : keywords) {
                    keyword = keyword.toLowerCase();
                    if (tweetText.contains(keyword)) {
                        output.write(new Text(keyword), one);
                    }
                }
                output.write(new Text("count"), one);
            }

        }

        String[] getKeyWords(Mapper<Object, Text, Text, IntWritable>.Context context) {

            Configuration conf = (Configuration) context.getConfiguration();
            String param = conf.get("keywords");

            return param.split(",");

        }
    }

    public static class ItemReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context output)
                throws IOException, InterruptedException {

            int wordCount = 0;

            for (IntWritable value : values) {
                wordCount += value.get();
            }

            output.write(key, new IntWritable(wordCount));
        }
    }
}
like image 303
Dude Avatar asked Oct 19 '22 13:10

Dude


1 Answers

Running in full distributed mode your TaskTracker/NodeManager (the thing running your mapper) is running in a separate JVM and it sounds like your class is not making it onto that JVM's classpath.

Try using the -libjars <csv,list,of,jars> command line arg on job invocation. This will have Hadoop distribute the jar to the TaskTracker JVM and load your classes from that jar. (Note, this copies the jar out to each node in your cluster and makes it available only for that specific job. If you have common libraries that would need to be invoked for a lot of jobs, you'd want to look into using the Hadoop distributed cache.)

You may also want to try yarn -jar ... when launching your job versus hadoop -jar ... since that's the new/preferred way to launch yarn jobs.

like image 137
dev.glitch Avatar answered Oct 27 '22 19:10

dev.glitch