I have a Storm topology running on Hadoop configured to Pseudo-Distributed mode. The Topology contains a bolt which has to write data to Hbase.
My first approach for testing purposes was to create (and close) connection and write data right inside my bolt's execute
method. However it looks like there were not so much resources on my local machine to process all requests coming to HBase. After about 30 successfully processed requests I see the following stuff in my Storm workers' logs :
o.a.z.ClientCnxn [INFO] Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
o.a.z.ClientCnxn [INFO] Socket connection established to localhost/127.0.0.1:2181, initiating session
o.a.z.ClientCnxn [INFO] Unable to read additional data from server sessionid 0x0, likely server has closed socket, closing socket connection and attempting reconnect
o.a.h.h.z.RecoverableZooKeeper [WARN] Possibly transient ZooKeeper, quorum=localhost:2181, exception=org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase/hbaseid
My thought was to reduce number of connections to HBase by creating single connection per each instance of my bolt - open connection in prepare
method and close it at cleanup
. However according to documentation cleanup
is not guaranteed to be called in distributed mode.
After this I found out Storm's framework for work with Hbase - storm-hbase. Unfortunately there is almost no information about it, just README at it's github repo.
Besides I need to be able to delete cells from HBase table. But I didn't find anything about it in storm-hbase doc.
Thanks in advance!
Oh boy, my time to shine! I've had to do a ton of optimization writing to HBase from Storm so hopefully this will help you.
If you're just getting started storm-hbase is a great way to start streaming data into hbase. You can just clone the project, do a maven install, and then reference it in your topology.
However, if you start getting more complicated logic then creating your own classes to talk to HBase is probably the way to go. This is what I'm going to show in my answer here.
I'm assuming you're using maven and the maven-shade plugin. You'll need to reference the hbase-client:
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
Also make sure to package hbase-site.xml
in your topology jar. You can download this file from your cluster and just put it in src/main/resources
. I also have one for testing in dev named hbase-site.dev.xml
. Then just use the shade plugin to move it to the root of the jar.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>classworlds:classworlds</exclude>
<exclude>junit:junit</exclude>
<exclude>jmock:*</exclude>
<exclude>*:xml-apis</exclude>
<exclude>org.apache.maven:lib:tests</exclude>
<exclude>log4j:log4j:jar:</exclude>
<exclude>org.testng:testng</exclude>
</excludes>
</artifactSet>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>core-site.xml</resource>
<file>src/main/resources/core-site.xml</file>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>hbase-site.xml</resource>
<file>src/main/resources/hbase-site.xml</file>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>hdfs-site.xml</resource>
<file>src/main/resources/hdfs-site.xml</file>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>junit/*</exclude>
<exclude>webapps/</exclude>
<exclude>testng*</exclude>
<exclude>*.js</exclude>
<exclude>*.png</exclude>
<exclude>*.css</exclude>
<exclude>*.json</exclude>
<exclude>*.csv</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
Note: I have lines in there for the other configs I use so remove them if you don't need them. As an aside, I don't really like packaging the configs like this BUT... it makes setting up the HBase connection much easier and solves a bunch of weird connection errors.
3/19/2018 update: The API for HBase has changed significantly since I wrote this answer but the concepts are the same.
The most important thing is to create one HConnection for each instance of your bolt in the prepare
method and then re-use that connection for the entire lifetime of the bolt!
Configuration config = HBaseConfiguration.create();
connection = HConnectionManager.createConnection(config);
To start, you can do single PUTs into HBase. You can open/close the table each call this way.
// single put method
private HConnection connection;
@SuppressWarnings("rawtypes")
@Override
public void prepare(java.util.Map stormConf, backtype.storm.task.TopologyContext context) {
Configuration config = HBaseConfiguration.create();
connection = HConnectionManager.createConnection(config);
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
try {
// do stuff
// call putFruit
} catch (Exception e) {
LOG.error("bolt error", e);
collector.reportError(e);
}
}
// example put method you'd call from within execute somewhere
private void putFruit(String key, FruitResult data) throws IOException {
HTableInterface table = connection.getTable(Constants.TABLE_FRUIT);
try {
Put p = new Put(key.getBytes());
long ts = data.getTimestamp();
p.add(Constants.FRUIT_FAMILY, Constants.COLOR, ts, data.getColor().getBytes());
p.add(Constants.FRUIT_FAMILY, Constants.SIZE, ts, data.getSize().getBytes());
p.add(Constants.FRUIT_FAMILY, Constants.WEIGHT, ts, Bytes.toBytes(data.getWeight()));
table.put(p);
} finally {
try {
table.close();
} finally {
// nothing
}
}
}
Notice I'm re-using the connection here. I recommend starting here because this is simpler to get working and to debug. Eventually this won't scale due to the number of requests you're trying to send across the network and you'll need to start batching multiple PUTs together.
In order to batch PUTs, you'll need to open a table using your HConnection and keep it open. You'll also need to set Auto Flush to false. This means the table will automatically buffer requests until it reaches the "hbase.client.write.buffer" size (default is 2097152).
// batch put method
private static boolean AUTO_FLUSH = false;
private static boolean CLEAR_BUFFER_ON_FAIL = false;
private HConnection connection;
private HTableInterface fruitTable;
@SuppressWarnings("rawtypes")
@Override
public void prepare(java.util.Map stormConf, backtype.storm.task.TopologyContext context) {
Configuration config = HBaseConfiguration.create();
connection = HConnectionManager.createConnection(config);
fruitTable = connection.getTable(Constants.TABLE_FRUIT);
fruitTable.setAutoFlush(AUTO_FLUSH, CLEAR_BUFFER_ON_FAIL);
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
try {
// do stuff
// call putFruit
} catch (Exception e) {
LOG.error("bolt error", e);
collector.reportError(e);
}
}
// example put method you'd call from within execute somewhere
private void putFruit(String key, FruitResult data) throws IOException {
Put p = new Put(key.getBytes());
long ts = data.getTimestamp();
p.add(Constants.FRUIT_FAMILY, Constants.COLOR, ts, data.getColor().getBytes());
p.add(Constants.FRUIT_FAMILY, Constants.SIZE, ts, data.getSize().getBytes());
p.add(Constants.FRUIT_FAMILY, Constants.WEIGHT, ts, Bytes.toBytes(data.getWeight()));
fruitTable.put(p);
}
In either method it is a good idea to still try to close your HBase connection in cleanup
. Just be aware it might not be called before your worker is killed.
new Delete(key);
instead of a Put.Let me know if you have more questions.
You can for example use the "publisher" thread?
this is: have a separate class which running as thread which will do the requests to hbase/mysql/elasticsearch/hdfs/etc... for you. And for performance reasons should do it in batches.
have a global list to handle concurrent operations and a executor service:
private transient BlockingQueue<Tuple> insertQueue;
private transient ExecutorService theExecutor;
private transient Future<?> publisherFuture;
have a thread class which will insert the documents for you
private class Publisher implements Runnable {
@Override
public void run() {
long sendBatchTs = System.currentTimeMillis();
while (true){
if(insertQueue.size >100){ // 100 tuples per batch
List<Tuple> batchQueue = new ArrayList<>(100);
insertQueue.drainTo(batchQueue, 100);
// write code to insert the 100 documents
sendBatchTs = System.currentTimeMillis();
}
else if (System.currentTimeMillis() - sendBatchTs > 5000){
// to prevent tuple timeout
int listSize = batchQueue.size();
List<Tuple> batchQueue = new ArrayList<>(listSize);
insertQueue.drainTo(batchQueue, listSize);
// write code to insert the 100 documents
sendBatchTs = System.currentTimeMillis();
}
}
// your code
}
}
init the thread class and the list in the prepare methood
@Override
public void prepare (final Map _conf, final TopologyContext _context , final OutputCollector _collector) {
// open your connection
insertQueue = new LinkedBlockingQueue<>();
theExecutor = Executors.newSingleThreadExecutor();
publisherFuture = theExecutor.submit(new Publisher());
}
close your connection on clean up
@Override
public void cleanup() {
super.cleanup();
theExecutor.shutdown();
publisherFuture.cancel(true);
// close your connection
}
Collect tuples in on execute method
@Override
public void execute(final Tuple _tuple) {
insertQueue.add(_tuple);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With