Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Approach to insert and delete values in HBase from Apache Storm bolt

I have a Storm topology running on Hadoop configured to Pseudo-Distributed mode. The Topology contains a bolt which has to write data to Hbase. My first approach for testing purposes was to create (and close) connection and write data right inside my bolt's execute method. However it looks like there were not so much resources on my local machine to process all requests coming to HBase. After about 30 successfully processed requests I see the following stuff in my Storm workers' logs :

o.a.z.ClientCnxn [INFO] Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
o.a.z.ClientCnxn [INFO] Socket connection established to localhost/127.0.0.1:2181, initiating session
o.a.z.ClientCnxn [INFO] Unable to read additional data from server sessionid 0x0, likely server has closed socket, closing socket connection and attempting reconnect
o.a.h.h.z.RecoverableZooKeeper [WARN] Possibly transient ZooKeeper, quorum=localhost:2181, exception=org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase/hbaseid

My thought was to reduce number of connections to HBase by creating single connection per each instance of my bolt - open connection in prepare method and close it at cleanup. However according to documentation cleanup is not guaranteed to be called in distributed mode.

After this I found out Storm's framework for work with Hbase - storm-hbase. Unfortunately there is almost no information about it, just README at it's github repo.

  1. So my first question whether using storm-hbase for Storm-Hbase integration is good solution? What could be the best way to do that?

Besides I need to be able to delete cells from HBase table. But I didn't find anything about it in storm-hbase doc.

  1. Is it possible to do that with storm-hbase? Or returning to the previous question, is there another way of doing it all?

Thanks in advance!

like image 255
miuser Avatar asked Aug 20 '15 20:08

miuser


2 Answers

Oh boy, my time to shine! I've had to do a ton of optimization writing to HBase from Storm so hopefully this will help you.

If you're just getting started storm-hbase is a great way to start streaming data into hbase. You can just clone the project, do a maven install, and then reference it in your topology.

However, if you start getting more complicated logic then creating your own classes to talk to HBase is probably the way to go. This is what I'm going to show in my answer here.

Project Setup

I'm assuming you're using maven and the maven-shade plugin. You'll need to reference the hbase-client:

<dependency>
   <groupId>org.apache.hbase</groupId>
   <artifactId>hbase-client</artifactId>
   <version>${hbase.version}</version>
</dependency>

Also make sure to package hbase-site.xml in your topology jar. You can download this file from your cluster and just put it in src/main/resources. I also have one for testing in dev named hbase-site.dev.xml. Then just use the shade plugin to move it to the root of the jar.

<plugin>
   <groupId>org.apache.maven.plugins</groupId>
   <artifactId>maven-shade-plugin</artifactId>
   <version>2.4</version>
   <configuration>
      <createDependencyReducedPom>true</createDependencyReducedPom>
      <artifactSet>
         <excludes>
            <exclude>classworlds:classworlds</exclude>
            <exclude>junit:junit</exclude>
            <exclude>jmock:*</exclude>
            <exclude>*:xml-apis</exclude>
            <exclude>org.apache.maven:lib:tests</exclude>
            <exclude>log4j:log4j:jar:</exclude>
            <exclude>org.testng:testng</exclude>
         </excludes>
      </artifactSet>
   </configuration>
   <executions>
      <execution>
         <phase>package</phase>
         <goals>
            <goal>shade</goal>
         </goals>
         <configuration>
            <transformers>
               <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                       <resource>core-site.xml</resource>
                       <file>src/main/resources/core-site.xml</file>
                   </transformer>
                   <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                       <resource>hbase-site.xml</resource>
                       <file>src/main/resources/hbase-site.xml</file>
                   </transformer>
                   <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                       <resource>hdfs-site.xml</resource>
                       <file>src/main/resources/hdfs-site.xml</file>
                   </transformer>
               <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
               <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass></mainClass>
               </transformer>
            </transformers>
            <filters>
               <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                     <exclude>META-INF/*.SF</exclude>
                     <exclude>META-INF/*.DSA</exclude>
                     <exclude>META-INF/*.RSA</exclude>
                     <exclude>junit/*</exclude>
                     <exclude>webapps/</exclude>
                     <exclude>testng*</exclude>
                     <exclude>*.js</exclude>
                     <exclude>*.png</exclude>
                     <exclude>*.css</exclude>
                     <exclude>*.json</exclude>
                     <exclude>*.csv</exclude>
                  </excludes>
               </filter>
            </filters>
         </configuration>
      </execution>
   </executions>
</plugin>

Note: I have lines in there for the other configs I use so remove them if you don't need them. As an aside, I don't really like packaging the configs like this BUT... it makes setting up the HBase connection much easier and solves a bunch of weird connection errors.

Managing HBase Connections in Storm

3/19/2018 update: The API for HBase has changed significantly since I wrote this answer but the concepts are the same.

The most important thing is to create one HConnection for each instance of your bolt in the prepare method and then re-use that connection for the entire lifetime of the bolt!

Configuration config = HBaseConfiguration.create();
connection = HConnectionManager.createConnection(config);

To start, you can do single PUTs into HBase. You can open/close the table each call this way.

// single put method
private HConnection connection;

@SuppressWarnings("rawtypes")
@Override
public void prepare(java.util.Map stormConf, backtype.storm.task.TopologyContext context) {
   Configuration config = HBaseConfiguration.create();
   connection = HConnectionManager.createConnection(config);
}

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
   try {
      // do stuff
      // call putFruit
   } catch (Exception e) {
      LOG.error("bolt error", e);
      collector.reportError(e);
   }
}

// example put method you'd call from within execute somewhere
private void putFruit(String key, FruitResult data) throws IOException {
   HTableInterface table = connection.getTable(Constants.TABLE_FRUIT);
   try {
     Put p = new Put(key.getBytes());
        long ts = data.getTimestamp();
        p.add(Constants.FRUIT_FAMILY, Constants.COLOR, ts, data.getColor().getBytes());
        p.add(Constants.FRUIT_FAMILY, Constants.SIZE, ts, data.getSize().getBytes());
        p.add(Constants.FRUIT_FAMILY, Constants.WEIGHT, ts, Bytes.toBytes(data.getWeight()));
        table.put(p);
   } finally {
      try {
         table.close();
      } finally {
         // nothing
      }
   }
}

Notice I'm re-using the connection here. I recommend starting here because this is simpler to get working and to debug. Eventually this won't scale due to the number of requests you're trying to send across the network and you'll need to start batching multiple PUTs together.

In order to batch PUTs, you'll need to open a table using your HConnection and keep it open. You'll also need to set Auto Flush to false. This means the table will automatically buffer requests until it reaches the "hbase.client.write.buffer" size (default is 2097152).

// batch put method
private static boolean AUTO_FLUSH = false;
private static boolean CLEAR_BUFFER_ON_FAIL = false;
private HConnection connection;
private HTableInterface fruitTable;

@SuppressWarnings("rawtypes")
@Override
public void prepare(java.util.Map stormConf, backtype.storm.task.TopologyContext context) {
   Configuration config = HBaseConfiguration.create();
   connection = HConnectionManager.createConnection(config);
   fruitTable = connection.getTable(Constants.TABLE_FRUIT);
   fruitTable.setAutoFlush(AUTO_FLUSH, CLEAR_BUFFER_ON_FAIL);
}

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
   try {
      // do stuff
      // call putFruit
   } catch (Exception e) {
      LOG.error("bolt error", e);
      collector.reportError(e);
   }
}

// example put method you'd call from within execute somewhere
private void putFruit(String key, FruitResult data) throws IOException {
   Put p = new Put(key.getBytes());
   long ts = data.getTimestamp();
   p.add(Constants.FRUIT_FAMILY, Constants.COLOR, ts, data.getColor().getBytes());
   p.add(Constants.FRUIT_FAMILY, Constants.SIZE, ts, data.getSize().getBytes());
   p.add(Constants.FRUIT_FAMILY, Constants.WEIGHT, ts, Bytes.toBytes(data.getWeight()));
   fruitTable.put(p);
}

In either method it is a good idea to still try to close your HBase connection in cleanup. Just be aware it might not be called before your worker is killed.

Other stuff

  • To do a delete, just do new Delete(key); instead of a Put.

Let me know if you have more questions.

like image 100
Kit Menke Avatar answered Oct 20 '22 05:10

Kit Menke


You can for example use the "publisher" thread?

this is: have a separate class which running as thread which will do the requests to hbase/mysql/elasticsearch/hdfs/etc... for you. And for performance reasons should do it in batches.

  1. have a global list to handle concurrent operations and a executor service:

    private transient BlockingQueue<Tuple> insertQueue;
    private transient ExecutorService theExecutor;
    private transient Future<?> publisherFuture;
    
  2. have a thread class which will insert the documents for you

    private class Publisher implements Runnable {
    
    @Override
    public void run() {
               long sendBatchTs = System.currentTimeMillis();
    
              while (true){
    
                  if(insertQueue.size >100){ // 100 tuples per batch
                         List<Tuple> batchQueue = new ArrayList<>(100);
                         insertQueue.drainTo(batchQueue, 100);
                         // write code to insert the 100 documents
                        sendBatchTs = System.currentTimeMillis();
                  }
                  else if (System.currentTimeMillis() - sendBatchTs > 5000){
                  // to prevent tuple timeout
                         int listSize = batchQueue.size();
    
                          List<Tuple> batchQueue = new ArrayList<>(listSize);
                         insertQueue.drainTo(batchQueue, listSize);
                         // write code to insert the 100 documents
                        sendBatchTs = System.currentTimeMillis();
                  }
    
    
              }
    
    
     // your code
    }
    }
    
  3. init the thread class and the list in the prepare methood

      @Override
      public void prepare (final Map _conf, final TopologyContext _context , final OutputCollector _collector) {
    
    // open your connection
    
       insertQueue = new LinkedBlockingQueue<>();
       theExecutor = Executors.newSingleThreadExecutor();
       publisherFuture = theExecutor.submit(new Publisher());
    }
    
  4. close your connection on clean up

    @Override
    public void cleanup() {
       super.cleanup();
    
       theExecutor.shutdown();
       publisherFuture.cancel(true);
       // close your connection
     }
    
  5. Collect tuples in on execute method

      @Override
      public void execute(final Tuple _tuple) {
               insertQueue.add(_tuple);
    
      }
    
like image 40
SQL.injection Avatar answered Oct 20 '22 06:10

SQL.injection