I created a driver which reads a config file, builds a list of objects (based on the config) and passes that list to MapReduce (MapReduce has a static attribute which holds a reference to that list of object).
It works but only locally. As soon as I run the job on a cluster config I will get all sort of errors suggesting that the list hasn't been built. It makes me think that I'm doing it wrong and on a cluster setup MapReduce is being run independently from the driver.
My question is how to correctly initialise a Mapper.
(I'm using Hadoop 2.4.1)
This is related to the problem of side data distribution.
There are two approaches for side data distribution.
1) Distributed Caches
2) Configuration
As you have the objects to be shared, we can use the Configuration class.
This discussion will depend on the Configuration class to make available an Object across the cluster, accessible to all Mappers and(or) Reducers. The approach here is quite simple. The setString(String, String) setter of the Configuration classed is harnessed to achieve this task. The Object that has to be shared across is serialized into a java string at the driver end and is de-serialized back to the object at the Mapper or Reducer.
In the example code below, I have used com.google.gson.Gson class for the easy serialization and deserialization. You can use Java Serialization as well.
Class that Represents the Object You need to Share
public class TestBean {
String string1;
String string2;
public TestBean(String test1, String test2) {
super();
this.string1 = test1;
this.string2 = test2;
}
public TestBean() {
this("", "");
}
public String getString1() {
return string1;
}
public void setString1(String test1) {
this.string1 = test1;
}
public String getString2() {
return string2;
}
public void setString2(String test2) {
this.string2 = test2;
}
}
The Main Class from where you can set the Configurations
public class GSONTestDriver {
public static void main(String[] args) throws Exception {
System.out.println("In Main");
Configuration conf = new Configuration();
TestBean testB1 = new TestBean("Hello1","Gson1");
TestBean testB2 = new TestBean("Hello2","Gson2");
Gson gson = new Gson();
String testSerialization1 = gson.toJson(testB1);
String testSerialization2 = gson.toJson(testB2);
conf.set("instance1", testSerialization1);
conf.set("instance2", testSerialization2);
Job job = new Job(conf, " GSON Test");
job.setJarByClass(GSONTestDriver.class);
job.setMapperClass(GSONTestMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
The mapper class from where you can retrieve the object
public class GSONTestMapper extends
Mapper<LongWritable, Text, Text, NullWritable> {
Configuration conf;
String inst1;
String inst2;
public void setup(Context context) {
conf = context.getConfiguration();
inst1 = conf.get("instance1");
inst2 = conf.get("instance2");
Gson gson = new Gson();
TestBean tb1 = gson.fromJson(inst1, TestBean.class);
System.out.println(tb1.getString1());
System.out.println(tb1.getString2());
TestBean tb2 = gson.fromJson(inst2, TestBean.class);
System.out.println(tb2.getString1());
System.out.println(tb2.getString2());
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(value,NullWritable.get());
}
}
The bean is converted to a serialized Json String using the toJson(Object src) method of the class com.google.gson.Gson. Then the serialised Json string is passed as value through the configuration instance and accessed by name from the Mapper. The string is deserialized there using the fromJson(String json, Class classOfT) method of the same Gson class. Instead of my test bean, you could place your objects.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With