I am new to Hadoop framework. I was trying to write a program which reads XML file from hdfs, parses it using JDOM and sends it to a database. The following is the Java file
package JDOMprs;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.jdom2.Document;
import org.jdom2.Element;
import org.jdom2.JDOMException;
import org.jdom2.input.SAXBuilder;
import com.vertica.hadoop.VerticaOutputFormat;
import com.vertica.hadoop.VerticaRecord;
public class ExampleParser extends Configured implements Tool {
public static class Map extends Mapper<LongWritable, Text, Text, DoubleWritable> {
private final static DoubleWritable one = new DoubleWritable(1);
private Text word = new Text();
private List mylist;
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(value, one);
public static class Reduce extends Reducer<Text, DoubleWritable, Text, VerticaRecord> {
VerticaRecord record = null;
String src_name;
String comment;
String rev_by;
String rev_dt;
String com_title;
public void setup(Context context) throws IOException, InterruptedException {
try {
record = new VerticaRecord(context.getConfiguration());
} catch (Exception e) {
throw new IOException(e);
public void reduce(Text key, Iterable<DoubleWritable> values,
Context context) throws IOException, InterruptedException {
if (record == null) {
throw new IOException("No output record found");
/******************** JDOM PARSER ***************************/
SAXBuilder builder = new SAXBuilder();
// File xmlFile = new
// File("C:/Users/Administrator/workspace/VerticaHadoop/src/JDOMprs/HadoopXML.xml");
try {
Document document = (Document) builder.build(key.toString());
Element rootNode = document.getRootElement();
List list = rootNode.getChildren("source");
// List ls= new ArrayList();
// Jdomparse jp= new Jdomparse();
// ls=jp.getParse(key);
for (int i = 0; i < list.size(); i++) {
Element node = (Element) list.get(i);
// System.out.println("Source Name : " +
// node.getChildText("source-name"));
// System.out.println("comment : " +
// node.getChildText("comment"));
// System.out.println("review by : " +
// node.getChildText("review-by"));
// System.out.println("review date : " +
// node.getChildText("review-date"));
// System.out.println("comment-title : " +
// node.getChildText("comment-title"));
record.set(0, node.getChildText("source-name").toString());
record.set(0, node.getChildText("comment").toString());
record.set(0, node.getChildText("review-by").toString());
record.set(0, node.getChildText("review-date").toString());
record.set(0, node.getChildText("comment-title").toString());
} catch (IOException io) {
} catch (JDOMException jdomex) {
/****************** END OF PARSER *****************************/
context.write(new Text("reviewtbl"), record);
public int run(String[] args) throws Exception {
// Set up the configuration and job objects
Configuration conf = getConf();
Job job = new Job(conf);
conf = job.getConfiguration();
conf.set("mapreduce.job.tracker", "local");
job.setJobName("vertica test");
FileInputFormat.addInputPath(job, new Path("/user/cloudera/input"));
VerticaOutputFormat.setOutput(job, "reviewtbl", true, "source varchar",
"comment varchar", "rev_by varchar", "rev_dt varchar",
"com_title varchar");
return 0;
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new ExampleParser(), args);
but I am getting the following exceptions.
12/12/20 02:41:34 INFO mapred.JobClient: Cleaning up the staging area hdfs://
Exception in thread "main" java.lang.RuntimeException: java.lang.InstantiationException
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:115)
at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:947)
at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:967)
at org.apache.hadoop.mapred.JobClient.access$500(JobClient.java:170)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:880)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:833)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1177)
at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:833)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:476)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:506)
at ExampleParser.run(ExampleParser.java:148)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at ExampleParser.main(ExampleParser.java:153)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:197)
Caused by: java.lang.InstantiationException
at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:30)
at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:113)
... 19 more
MapReduce facilitates concurrent processing by splitting petabytes of data into smaller chunks, and processing them in parallel on Hadoop commodity servers. In the end, it aggregates all the data from multiple servers to return a consolidated output back to the application.
MapReduce program executes in three stages, namely map stage, shuffle stage, and reduce stage.
MapReduce assigns fragments of data across the nodes in a Hadoop cluster. The goal is to split a dataset into chunks and use an algorithm to process those chunks at the same time. The parallel processing on multiple machines greatly increases the speed of handling even petabytes of data.
MapReduce is a programming model for processing large data sets with a parallel , distributed algorithm on a cluster (source: Wikipedia). Map Reduce when coupled with HDFS can be used to handle big data.
You can't use / instantiate the FileInputFormat
class: it's an abstract class.
If you want to parse the XML yourself then you'll need to write your own InputFormat
that extends FileInputFormat
, and the record reader can pass the entire contents to the mapper as the value. I think the Hadoop - The Definitive Guide has an example for WholeFileInputFormat
, or something like that, or Google will probably have something:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With