I am writing hadoop programs , and i really dont want to play with deprecated classes . Anywhere online i am not able to find programs with updated
org.apache.hadoop.conf.Configuration
class insted of
org.apache.hadoop.mapred.JobConf
class.
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(Test.class);
conf.setJobName("TESST");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
This is how my main() looks like. Can please anyone will provide me with updated function.
Here it's the classic WordCount example. You'll notice a tone of other imports that may not be necessary, reading the code you'll figure out which is which.
What's different? I'm using the Tool interface and the GenericOptionParser to parse the job command a.k.a : hadoop jar ....
In the mapper you'll notice a run thing. You can get rid of that, it's usually called by default when you supply the code for the Map method. I put it there to give you the info that you can further control the mapping stage. This is all using the new API. I hope you find it useful. Any other questions, let me know!
import java.io.IOException;
import java.util.*;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.GenericOptionsParser;
public class Inception extends Configured implements Tool{
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
public void run (Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public int run(String[] args) throws Exception {
Job job = Job.getInstance(new Configuration());
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setJarByClass(WordCount.class);
job.submit();
return 0;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
ToolRunner.run(new WordCount(), otherArgs);
}
}
Also take classic WordCount as example:
org.apache.hadoop.mapred.JobConf
is old, in new version we use Configuration
and Job
to achieve.
Please use org.apache.hadoop.mapreduce.lib.*
(it is new API) instead of org.apache.hadoop.mapred.TextInputFormat
(it is old).
The Mapper
and Reducer
are nothing new, please see main
function, it includes relatively overall configurations, feel free to change them according to your specific requirements.
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private Text outputKey;
private IntWritable outputVal;
@Override
public void setup(Context context) {
outputKey = new Text();
outputVal = new IntWritable(1);
}
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer stk = new StringTokenizer(value.toString());
while(stk.hasMoreTokens()) {
outputKey.set(stk.nextToken());
context.write(outputKey, outputVal);
}
}
}
class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result;
@Override
public void setup(Context context) {
result = new IntWritable();
}
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable val: values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public class WordCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
if(args.length != 2) {
System.err.println("Usage: <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "Word Count");
// set jar
job.setJarByClass(WordCount.class);
// set Mapper, Combiner, Reducer
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
/* Optional, set customer defined Partioner:
* job.setPartitionerClass(MyPartioner.class);
*/
// set output key
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// set input and output path
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// by default, Hadoop use TextInputFormat and TextOutputFormat
// any customer defined input and output class must implement InputFormat/OutputFormat interface
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With