Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

HBase Map-only Row Delete

First time writing a HBase mapreduce and I'm having trouble deleting rows in HBase (trying to run it as a map-only job). The job succeeds and is able to scan the HBase table and I'm able to get the correct rowkeys in the mapper read from HBase (verified through sysout). However, it seems like the call to Delete del = new Delete(row.get()) isn't actually doing anything.

Below is the code I'm trying to run:

HBaseDelete.java

public class HBaseDelete { 
  public static void main(String[] args) throws Exception {

    Configuration config = HBaseConfiguration.create();
    Job job = new Job(config, "log_table");
    job.setJarByClass(HBaseDeleteMapper.class);     

    Scan scan = new Scan();
    scan.setCaching(500);        
    scan.setCacheBlocks(false);

    TableMapReduceUtil.initTableMapperJob("log_table", scan, HBaseDeleteMapper.class, null, null, job);

    job.setOutputFormatClass(NullOutputFormat.class);
    job.setNumReduceTasks(0);

    boolean b = job.waitForCompletion(true);
    if (!b) {
        throw new IOException("error with job!");
    }

  }
}

HBaseDeleteMapper.java

public class HBaseDeleteMapper extends TableMapper<ImmutableBytesWritable, Delete>{
  @Override
  public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
    Delete delete = new Delete(row.get());
    context.write(row, delete);
  }
}

Is there something missing to 'commit' the deletion?

like image 685
eangeles Avatar asked Mar 21 '23 20:03

eangeles


2 Answers

You're writing to the context, not to the table, your mapper should look somewhat similar to this one:

public class HBaseDeleteMapper extends TableMapper<ImmutableBytesWritable, NullWritable>{

    private HTable myTable;

    protected void setup(Context context) throws IOException, InterruptedException {
        /* HTable instance for deletes */
        myTable = new HTable(HBaseConfiguration.create(), "myTable".getBytes());
    }

    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
        myTable.delete(new Delete(row.get())); /* Delete the row from the table */
        //context.write(row, NullWritable.get()); /* Just an output with deleted rows if you need it for something (avoid it if you not) */
    }

    protected void cleanup(Context context) throws IOException, InterruptedException { 
        myTable.close(); /* Close table */
    }

}

Please notice that delete operations don't use the write buffer, this code will issue 1 RPC operation per delete, which is not good for this type of jobs. To address that you can build your own List<Delete> to batch them:

public class HBaseDeleteMapper extends TableMapper<NullWritable, NullWritable>{

    private HTable myTable;
    private List<Delete> deleteList = new ArrayList<Delete>();
    final private int buffer = 10000; /* Buffer size, tune it as desired */

    protected void setup(Context context) throws IOException, InterruptedException {
        /* HTable instance for deletes */
        myTable = new HTable(HBaseConfiguration.create(), "myTable".getBytes());
    }

    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
        deleteList.add(new Delete(row.get())); /* Add delete to the batch */
        if (deleteList.size()==buffer) {
            myTable.delete(deleteList); /* Submit batch */
            deleteList.clear(); /* Clear batch */
        }
    }

    protected void cleanup(Context context) throws IOException, InterruptedException {
        if (deleteList.size()>0) {
            myTable.delete(deleteList); /* Submit remaining batch */
        }
        myTable.close(); /* Close table */
    }

}
like image 160
Rubén Moraleda Avatar answered Mar 23 '23 10:03

Rubén Moraleda


COMPLETE CODE

Below code will scan you hbase table for rowkeys containing a substring in common and deletes them when size of list is > 1000(this is to make sure that your list does not blow out of space/heap space) .Code also writes these row keys to your hdfs .

Driver

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.oclc.wcsync.hadoop.mapper.HbaseBulkDeleteMapper;
import org.oclc.wcsync.hadoop.util.JobName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

/** doc. */
public class HbaseBulkDelete extends Configured implements Tool{

    /** doc. */
    private static final Logger LOG = LoggerFactory.getLogger(HbaseBulkDelete.class);


    /**
     * doc.
     * @param args ...
     * @throws Exception ...
     */
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(HBaseConfiguration.create(), new HbaseBulkDelete(), args);
        System.exit(res);
    }

    @Override
    public int run(String[] strings) throws Exception {


        JobName jobName = JobName.HBASE_DELETE;
        LOG.info ("Got into class driver");
        Configuration conf = HBaseConfiguration.create ();
        String env = "prod";
        Properties hadoopProps = new Properties();
        hadoopProps.load(HbaseBulkDelete.class.getResourceAsStream("/hadoop.config." + env + ".properties"));
        conf.set("jobName", jobName.name());
        conf.set ("hbase.master.catalog.timeout","600000");
        conf.set ("hbase.client.scanner.timeout.period","600000");
        conf.set ("hbase.rpc.timeout","6000000");
        conf.set ("mapred.task.timeout","6000000");
        conf.set("mapreduce.map.memory.mb","4096");
        Job job = new Job(conf);
        job.setJobName(jobName.format("HbaseBulkDelete"));
        job.setJarByClass(HbaseBulkDelete.class);
        Scan s = new Scan ();
        s.addFamily(Bytes.toBytes("data"));
        s.setStartRow (Bytes.toBytes ("Your_Substring"));

        TableMapReduceUtil.initTableMapperJob ("Ingest", s, HbaseBulkDeleteMapper.class, TextOutputFormat.class,
                TextOutputFormat.class, job);

        job.setNumReduceTasks(0);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileOutputFormat.setOutputPath(job, new Path("/user/neethu/HbaseBulkDelete"));


        return job.waitForCompletion(true) ? 0 : -1;
    }
}

MAPPER

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class HbaseBulkDeleteMapper extends TableMapper<Text, Text> {
    private static final Logger LOG = LoggerFactory.getLogger(HbaseBulkDeleteMapper.class);
    Configuration conf;
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        conf = context.getConfiguration();

    }
    List<Delete> listOfBatchDelete = new ArrayList<Delete> ();
    @Override
    public void map(ImmutableBytesWritable row, Result values, Context context)
            throws IOException, InterruptedException {
        HTable table= new HTable(conf,"Ingest");
        if (listOfBatchDelete != null && !listOfBatchDelete.isEmpty () && listOfBatchDelete.size () > 1000) {
            LOG.info ("Deleted records!");

            listOfBatchDelete.clear ();
        }
        String KEY=Bytes.toString(values.getRow ());
        try {
            if (KEY.contains ("Your_substring") ){
                LOG.info ("RowKey:"+KEY );
                Delete d=new Delete(Bytes.toBytes(KEY));
                listOfBatchDelete.add(d);
                context.write (new Text ("RowKey"), new Text (KEY));
            }

        } catch (Exception e) {
            LOG.error ("error  ---" + e);
        }
       // table.delete(listOfBatchDelete);
    }
}
like image 29
Neethu Lalitha Avatar answered Mar 23 '23 11:03

Neethu Lalitha