First time writing a HBase mapreduce and I'm having trouble deleting rows in HBase (trying to run it as a map-only job). The job succeeds and is able to scan the HBase table and I'm able to get the correct rowkeys in the mapper read from HBase (verified through sysout). However, it seems like the call to Delete del = new Delete(row.get())
isn't actually doing anything.
Below is the code I'm trying to run:
HBaseDelete.java
public class HBaseDelete {
public static void main(String[] args) throws Exception {
Configuration config = HBaseConfiguration.create();
Job job = new Job(config, "log_table");
job.setJarByClass(HBaseDeleteMapper.class);
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);
TableMapReduceUtil.initTableMapperJob("log_table", scan, HBaseDeleteMapper.class, null, null, job);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
}
}
HBaseDeleteMapper.java
public class HBaseDeleteMapper extends TableMapper<ImmutableBytesWritable, Delete>{
@Override
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
Delete delete = new Delete(row.get());
context.write(row, delete);
}
}
Is there something missing to 'commit' the deletion?
You're writing to the context, not to the table, your mapper should look somewhat similar to this one:
public class HBaseDeleteMapper extends TableMapper<ImmutableBytesWritable, NullWritable>{
private HTable myTable;
protected void setup(Context context) throws IOException, InterruptedException {
/* HTable instance for deletes */
myTable = new HTable(HBaseConfiguration.create(), "myTable".getBytes());
}
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
myTable.delete(new Delete(row.get())); /* Delete the row from the table */
//context.write(row, NullWritable.get()); /* Just an output with deleted rows if you need it for something (avoid it if you not) */
}
protected void cleanup(Context context) throws IOException, InterruptedException {
myTable.close(); /* Close table */
}
}
Please notice that delete operations don't use the write buffer, this code will issue 1 RPC operation per delete, which is not good for this type of jobs. To address that you can build your own List<Delete>
to batch them:
public class HBaseDeleteMapper extends TableMapper<NullWritable, NullWritable>{
private HTable myTable;
private List<Delete> deleteList = new ArrayList<Delete>();
final private int buffer = 10000; /* Buffer size, tune it as desired */
protected void setup(Context context) throws IOException, InterruptedException {
/* HTable instance for deletes */
myTable = new HTable(HBaseConfiguration.create(), "myTable".getBytes());
}
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
deleteList.add(new Delete(row.get())); /* Add delete to the batch */
if (deleteList.size()==buffer) {
myTable.delete(deleteList); /* Submit batch */
deleteList.clear(); /* Clear batch */
}
}
protected void cleanup(Context context) throws IOException, InterruptedException {
if (deleteList.size()>0) {
myTable.delete(deleteList); /* Submit remaining batch */
}
myTable.close(); /* Close table */
}
}
Below code will scan you hbase table for rowkeys containing a substring in common and deletes them when size of list is > 1000(this is to make sure that your list does not blow out of space/heap space) .Code also writes these row keys to your hdfs .
Driver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.oclc.wcsync.hadoop.mapper.HbaseBulkDeleteMapper;
import org.oclc.wcsync.hadoop.util.JobName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
/** doc. */
public class HbaseBulkDelete extends Configured implements Tool{
/** doc. */
private static final Logger LOG = LoggerFactory.getLogger(HbaseBulkDelete.class);
/**
* doc.
* @param args ...
* @throws Exception ...
*/
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(HBaseConfiguration.create(), new HbaseBulkDelete(), args);
System.exit(res);
}
@Override
public int run(String[] strings) throws Exception {
JobName jobName = JobName.HBASE_DELETE;
LOG.info ("Got into class driver");
Configuration conf = HBaseConfiguration.create ();
String env = "prod";
Properties hadoopProps = new Properties();
hadoopProps.load(HbaseBulkDelete.class.getResourceAsStream("/hadoop.config." + env + ".properties"));
conf.set("jobName", jobName.name());
conf.set ("hbase.master.catalog.timeout","600000");
conf.set ("hbase.client.scanner.timeout.period","600000");
conf.set ("hbase.rpc.timeout","6000000");
conf.set ("mapred.task.timeout","6000000");
conf.set("mapreduce.map.memory.mb","4096");
Job job = new Job(conf);
job.setJobName(jobName.format("HbaseBulkDelete"));
job.setJarByClass(HbaseBulkDelete.class);
Scan s = new Scan ();
s.addFamily(Bytes.toBytes("data"));
s.setStartRow (Bytes.toBytes ("Your_Substring"));
TableMapReduceUtil.initTableMapperJob ("Ingest", s, HbaseBulkDeleteMapper.class, TextOutputFormat.class,
TextOutputFormat.class, job);
job.setNumReduceTasks(0);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path("/user/neethu/HbaseBulkDelete"));
return job.waitForCompletion(true) ? 0 : -1;
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class HbaseBulkDeleteMapper extends TableMapper<Text, Text> {
private static final Logger LOG = LoggerFactory.getLogger(HbaseBulkDeleteMapper.class);
Configuration conf;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
conf = context.getConfiguration();
}
List<Delete> listOfBatchDelete = new ArrayList<Delete> ();
@Override
public void map(ImmutableBytesWritable row, Result values, Context context)
throws IOException, InterruptedException {
HTable table= new HTable(conf,"Ingest");
if (listOfBatchDelete != null && !listOfBatchDelete.isEmpty () && listOfBatchDelete.size () > 1000) {
LOG.info ("Deleted records!");
listOfBatchDelete.clear ();
}
String KEY=Bytes.toString(values.getRow ());
try {
if (KEY.contains ("Your_substring") ){
LOG.info ("RowKey:"+KEY );
Delete d=new Delete(Bytes.toBytes(KEY));
listOfBatchDelete.add(d);
context.write (new Text ("RowKey"), new Text (KEY));
}
} catch (Exception e) {
LOG.error ("error ---" + e);
}
// table.delete(listOfBatchDelete);
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With