Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Storm > Howto Integrate Java callback into a Spout

I'm trying to integrate Storm (see here) into my project. I grok the concepts of topologies, spouts, and bolts. But now, I'm trying to figure out the actual implementation of a few things.

A) I have a polyglot environment with Java and Clojure. My Java code is a callback class with methods firing streaming data. The event data pushed to those methods, is what I want to use as a spout.

So the first question is how to connect the data coming into those methods, to a spout ? I'm trying to i) pass an backtype.storm.topology.IRichSpout , then ii) pass a backtype.storm.spout.SpoutOutputCollector (see here) to that spout's open function (see here). But I can't see a way to actually pass in any kind of map or list.

B) The rest of my project is all Clojure. There will be a lot of data coming through those methods. Each event will have an ID of between 1 and 100. In Clojure, I'll want to split data coming from the spout, into different threads of execution. Those, I think, will be the bolts.

How can I set up a Clojure bolt to take event data from the spout, then break-off a thread based on the ID of the incoming event ?

Thanks in advance Tim

[EDIT 1]

I've actually gotten past this problem. I ended up 1) implementing my own IRichSpout. I then 2) connected that spout's internal tuple to the incoming stream data in my java callback class. I'm not sure if this is idiomatic. But it compiles and runs without error. However, 3) I don't see the incoming stream data (definitely there), coming through the printstuff bolt.

In order to ensure that the event data gets propagated, is there something specific I have to do in the spout or bolt implementation or topology definition? Thanks.


      ;; tie Java callbacks to a Spout that I created
      (.setSpout java-callback ibspout)

      (storm/defbolt printstuff ["word"] [tuple collector]
        (println (str "printstuff --> tuple["tuple"] > collector["collector"]"))
      )
      (storm/topology
       { "1" (storm/spout-spec ibspout)
       }
       { "3" (storm/bolt-spec  { "1" :shuffle }
                               printstuff
             )
       })

[EDIT 2]

On the advice of SO member Ankur, I'm rejigging my topology. After I've created my Java callback, I pass it's tuple to the below IBSpout, using (.setTuple ibspout (.getTuple java-callback)). I don't pass the entire Java callback object, because I get a NotSerializable error. Everything compiles and runs without error. But again, there's no data coming to my printstuff bolt. Hmmm.


    public class IBSpout implements IRichSpout {

      /**
       * Storm spout stuff
       */
      private SpoutOutputCollector _collector;

      private List _tuple = new ArrayList();
      public void setTuple(List tuple) { _tuple = tuple; }
      public List getTuple() { return _tuple; }

      /**
       * Storm ISpout interface functions
       */
      public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
      }
      public void close() {}
      public void activate() {}
      public void deactivate() {}
      public void nextTuple() {
        _collector.emit(_tuple);
      }
      public void ack(Object msgId) {}
      public void fail(Object msgId) {}


      public void declareOutputFields(OutputFieldsDeclarer declarer) {}
      public java.util.Map  getComponentConfiguration() { return new HashMap(); }

    }

like image 964
Nutritioustim Avatar asked Oct 22 '22 13:10

Nutritioustim


1 Answers

It seems that you are passing the spout to your callback class which seems to a bit weird. When a topology is executed storm will periodically calls the spouts nextTuple method, hence what you need to do is pass the java callback to your custom spout implementation so that when storm calls your spout, the spout calls the java callback to get the next set of tuples to be fed into the topology.

The key concept to understand is that Spouts pulls data when requested by storm, you don't push data to spouts. Your callback cannot call spout to push data to it, rather your spout should pull data (from some java method or any memory buffer) when your spout's nextTuple method is called.

like image 55
Ankur Avatar answered Nov 03 '22 23:11

Ankur