Currently I have two Hbase tables (lets call them tableA
and tableB
). Using a single stage MapReduce job the data in tableA
is read processed and saved to tableB
. Currently both tables reside on the same HBase cluster. However, I need to relocate tableB
to its on cluster.
Is it possible to configure a single stage map reduce job in Hadoop to read and write from separate instances of HBase?
In mapreduce take Hbase table object to take one table and by extending tablemapper use 2nd table. By this way you can join 2 tables.
Basically, to perform CRUD operations on HBase tables we use Java client API for HBase. Since HBase has a Java Native API and it is written in Java thus it offers programmatic access to DML (Data Manipulation Language).
hbase-env.sh provides a handy mechanism to do this. HBase uses the Secure Shell (ssh) command and utilities extensively to communicate between cluster nodes. Each server in the cluster must be running ssh so that the Hadoop and HBase daemons can be managed.
It is possible, HBase's CopyTable MapReduce job does it by using TableMapReduceUtil.initTableReducerJob()
which allows you to set an alternative quorumAddress in case you need to write to remote clusters:
public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer, org.apache.hadoop.mapreduce.Job job, Class partitioner, String quorumAddress, String serverClass, String serverImpl)
quorumAddress - Distant cluster to write to; default is null for output to the cluster that is designated in hbase-site.xml. Set this String to the zookeeper ensemble of an alternate remote cluster when you would have the reduce write a cluster that is other than the default; e.g. copying tables between clusters, the source would be designated by hbase-site.xml and this param would have the ensemble address of the remote cluster. The format to pass is particular. Pass :: such as server,server2,server3:2181:/hbase.
Another option is to implement your own custom reducer to write to the remote table instead of writing to the context. Something similar to this:
public static class MyReducer extends Reducer<Text, Result, Text, Text> {
protected Table remoteTable;
protected Connection connection;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
// Clone configuration and provide a new quorum address for the remote cluster
Configuration config = HBaseConfiguration.create(context.getConfiguration());
config.set("hbase.zookeeper.quorum","quorum1,quorum2,quorum3");
connection = ConnectionFactory.createConnection(config); // HBase 0.99+
//connection = HConnectionManager.createConnection(config); // HBase <0.99
remoteTable = connection.getTable("myTable".getBytes());
remoteTable.setAutoFlush(false);
remoteTable.setWriteBufferSize(1024L*1024L*10L); // 10MB buffer
}
public void reduce(Text boardKey, Iterable<Result> results, Context context) throws IOException, InterruptedException {
/* Write puts to remoteTable */
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
if (remoteTable!=null) {
remoteTable.flushCommits();
remoteTable.close();
}
if(connection!=null) {
connection.close();
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With