Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Hadoop - Writing to HBase directly from the Mapper

I have a haddop job that its output should be written to HBase. I do not really needs reducer, the kind of row I would like to insert is determined in the Mapper.

How can I use TableOutputFormat to achieve this? From all the examples I have seen the assumption is that the reducer is the one creating the Put, and that TableMapper is just for reading from HBase table.

In my case the input is HDFS the output is Put to specific table, I cannot find anything in TableMapReduceUtil that can help me with that either.

Is there any example out there that can help me with that?

BTW, I am using the new Hadoop API

like image 483
yosi Avatar asked Jun 16 '12 08:06

yosi


2 Answers

This is the example of reading from file and put all lines into Hbase. This example is from "Hbase: The definitive guide" and you can find it on repository. To get it just clone repo on your computer:

git clone git://github.com/larsgeorge/hbase-book.git

In this book you can also find all the explanations about the code. But if something is incomprehensible for you, feel free to ask.

`    public class ImportFromFile {
     public static final String NAME = "ImportFromFile"; 
     public enum Counters { LINES }

     static class ImportMapper
     extends Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> { 
       private byte[] family = null;
       private byte[] qualifier = null;

       @Override
       protected void setup(Context context)
         throws IOException, InterruptedException {
         String column = context.getConfiguration().get("conf.column");
         byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));
         family = colkey[0];
         if (colkey.length > 1) {
           qualifier = colkey[1];
         }
       }

       @Override
       public void map(LongWritable offset, Text line, Context context) 
       throws IOException {
          try {
           String lineString = line.toString();
           byte[] rowkey = DigestUtils.md5(lineString); 
           Put put = new Put(rowkey);
           put.add(family, qualifier, Bytes.toBytes(lineString)); 
           context.write(new ImmutableBytesWritable(rowkey), put);
           context.getCounter(Counters.LINES).increment(1);
         } catch (Exception e) {
           e.printStackTrace();
         }
       }
     }

     private static CommandLine parseArgs(String[] args) throws ParseException { 
       Options options = new Options();
       Option o = new Option("t", "table", true,
         "table to import into (must exist)");
       o.setArgName("table-name");
       o.setRequired(true);
       options.addOption(o);
       o = new Option("c", "column", true,
         "column to store row data into (must exist)");
       o.setArgName("family:qualifier");
       o.setRequired(true);
       options.addOption(o);
       o = new Option("i", "input", true,
         "the directory or file to read from");
       o.setArgName("path-in-HDFS");
       o.setRequired(true);
       options.addOption(o);
       options.addOption("d", "debug", false, "switch on DEBUG log level");
       CommandLineParser parser = new PosixParser();
       CommandLine cmd = null;
       try {
         cmd = parser.parse(options, args);
       } catch (Exception e) {
         System.err.println("ERROR: " + e.getMessage() + "\n");
         HelpFormatter formatter = new HelpFormatter();
         formatter.printHelp(NAME + " ", options, true);
         System.exit(-1);
       }
       return cmd;
     }

     public static void main(String[] args) throws Exception {
       Configuration conf = HBaseConfiguration.create();
       String[] otherArgs =
         new GenericOptionsParser(conf, args).getRemainingArgs(); 
       CommandLine cmd = parseArgs(otherArgs);
       String table = cmd.getOptionValue("t");
       String input = cmd.getOptionValue("i");
       String column = cmd.getOptionValue("c");
       conf.set("conf.column", column);
       Job job = new Job(conf, "Import from file " + input + " into table " + table); 

            job.setJarByClass(ImportFromFile.class);
       job.setMapperClass(ImportMapper.class);
       job.setOutputFormatClass(TableOutputFormat.class);
       job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
       job.setOutputKeyClass(ImmutableBytesWritable.class);
       job.setOutputValueClass(Writable.class);
       job.setNumReduceTasks(0); 
       FileInputFormat.addInputPath(job, new Path(input));
       System.exit(job.waitForCompletion(true) ? 0 : 1);
     }
    }`
like image 53
wonsky Avatar answered Nov 12 '22 16:11

wonsky


You just need to make the mapper output the pair. OutputFormat only specifies how to persist the output key-values. It does not necessarily mean that the key values come from reducer. You would need to do something like this in the mapper:

... extends TableMapper<ImmutableBytesWritable, Put>() {
    ...
    ...
    context.write(<some key>, <some Put or Delete object>);
}
like image 20
Hari Menon Avatar answered Nov 12 '22 17:11

Hari Menon