Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Grouping in a simple aggregation storm topology

I'm trying to write a topology that does the following:

  1. A spout that subscribes to a twitter feed (based on a keyword)
  2. An aggregation bolt that aggregates a number of tweets (say N) in a collection and sends them the printer bolt
  3. A simple bolt that prints the collection to the console at once.

In reality I want to do some more processing on the collection.

I tested it locally and looks like it's working. However, I'm not sure if I've set the groupings on the bolts correctly and if this would work correctly when deployed on an actual storm cluster. I would appreciate if someone can help review this topology and suggest any errors, changes or improvements.

Thanks.

This is what my topology looks like.

builder.setSpout("spout", new TwitterFilterSpout("pittsburgh"));
   builder.setBolt("sampleaggregate", new SampleAggregatorBolt())
                .shuffleGrouping("spout");
   builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate");

Aggregation Bolt

public class SampleAggregatorBolt implements IRichBolt {

    protected OutputCollector collector;
    protected Tuple currentTuple;
    protected Logger log;
    /**
     * Holds the messages in the bolt till you are ready to send them out
     */
    protected List<Status> statusCache;

    @Override
    public void prepare(Map stormConf, TopologyContext context,
                        OutputCollector collector) {
        this.collector = collector;

        log = Logger.getLogger(getClass().getName());
        statusCache = new ArrayList<Status>();
    }

    @Override
    public void execute(Tuple tuple) {
        currentTuple = tuple;

        Status currentStatus = null;
        try {
            currentStatus = (Status) tuple.getValue(0);
        } catch (ClassCastException e) {
        }
        if (currentStatus != null) {

            //add it to the status cache
            statusCache.add(currentStatus);
            collector.ack(tuple);


            //check the size of the status cache and pass it to the next stage if you have enough messages to emit
            if (statusCache.size() > 10) {
                collector.emit(new Values(statusCache));
            }

        }
    }

    @Override
    public void cleanup() {


    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("tweets"));

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;  //To change body of implemented methods use File | Settings | File Templates.
    }


    protected void setupNonSerializableAttributes() {

    }

}

Printer Bolt

public class PrinterBolt extends BaseBasicBolt {

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        System.out.println(tuple.size() + " "  + tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer ofd) {
    }

}
like image 712
Soumya Simanta Avatar asked Jun 04 '13 18:06

Soumya Simanta


People also ask

What stream grouping method is present in Apache Storm?

There are eight built-in stream groupings in Storm, and you can implement a custom stream grouping by implementing the CustomStreamGrouping interface: Shuffle grouping: Tuples are randomly distributed across the bolt's tasks in a way such that each bolt is guaranteed to get an equal number of tuples.

What is a Storm topology?

Networks of spouts and bolts are packaged into a "topology" which is the top-level abstraction that you submit to Storm clusters for execution. A topology is a graph of stream transformations where each node is a spout or bolt.

Which of the following method of Storm topology is called when a spout is going to shutdown?

close − This method is called when a spout is going to shutdown. declareOutputFields − Declares the output schema of the tuple. fail − Specifies that a specific tuple is not processed and not to be reprocessed.

What is the basic unit of data in Storm?

The basic unit of data that can be processed by a Storm application is called a tuple. Each tuple consists of a predefined list of fields. The value of each field can be a byte, char, integer, long, float, double, Boolean, or byte array.


1 Answers

From what I can see it looks good. The devil's in the details, though. I'm not sure what your aggregator bolt does but if it makes any assumptions about the values being passed to it then you should consider an appropriate fields grouping. This might not make that big of a difference as you're using the default parallelism hint of 1, but should you decide to scale with multiple aggregate bolt instances implicit logic assumptions you make may call for a non-shuffle grouping.

like image 66
Chris Gerken Avatar answered Nov 14 '22 01:11

Chris Gerken