I have this code that saves to HBase HTABLE. The expected behavior is that the table will push the commits or "flush" the puts to hbase for each partition.
NOTE: This is the updated code
rdd.foreachPartition(p => {
val table = connection.getTable(TableName.valueOf(HTABLE))
val mutator = connection.getBufferedMutator(TableName.valueOf(HTABLE))
p.foreach(row => {
val hRow = new Put(rowkey)
hRow.addColumn....
// use table.exists instead of table.checkAndPut (in favor of BufferedMutator's flushCommits)
val exists = table.exists(new Get(rowkey))
if (!exists) {
hRow.addColumn...
}
mutator.mutate(hRow)
})
table.close()
mutator.flush()
mutator.close()
})
In HBase 1.1, HTable is deprecated and there's no flushCommits() available in org.apache.hadoop.hbase.client.Table.
Replacing BufferedMutator.mutate(put) is ok for normal puts, but mutator does not have any checkAndPut similar to Table.
In the new API, BufferedMutator
is used.
You could change Table t = connection.getTable(TableName.valueOf("foo"))
to BufferedMutator t = connection.getBufferedMutator(TableName.valueOf("foo"))
. And then change t.put(p);
to t.mutate(p);
It works for me!
There is little information about that when I was searching, even in the official document. Hope my answer is helpful, and someone could update the document.
You need to set autoFlush to false see section 11.7.4 in http://hbase.apache.org/0.94/book/perf.writing.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With