Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

(Twitter) Storm's Window On Aggregation

I'm playing around with Storm, and I'm wondering where Storm specifies (if possible) the (tumbling/sliding) window size upon an aggregation. E.g. If we want to find the trending topics for the previous hour on Twitter. How do we specify that a bolt should return results for every hour? Is this done programatically inside each bolt? Or is it some way to specify a "window" ?

like image 940
gronnbeck Avatar asked Sep 26 '12 14:09

gronnbeck


2 Answers

Disclaimer: I wrote the Trending Topics with Storm article referenced by gakhov in his answer above.

I'd say the best practice is to use the so-called tick tuples in Storm 0.8+. With these you can configure your own spouts/bolts to be notified at certain time intervals (say, every ten seconds or every minute).

Here's a simple example that configures the component in question to receive tick tuples every ten seconds:

// in your spout/bolt
@Override
public Map<String, Object> getComponentConfiguration() {
    Config conf = new Config();
    int tickFrequencyInSeconds = 10;
    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencyInSeconds);
    return conf;
}

You can then use a conditional switch in your spout/bolt's execute() method to distinguish "normal" incoming tuples from the special tick tuples. For instance:

// in your spout/bolt
@Override
public void execute(Tuple tuple) {
    if (isTickTuple(tuple)) {
        // now you can trigger e.g. a periodic activity
    }
    else {
        // do something with the normal tuple
    }
}

private static boolean isTickTuple(Tuple tuple) {
    return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
        && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
}

Again, I wrote a pretty detailed blog post about doing this in Storm a few days ago as gakhov pointed out (shameless plug!).

like image 125
Michael G. Noll Avatar answered Oct 16 '22 01:10

Michael G. Noll


Add a new spout with parallelism degree of 1, and have it emit an empty signal and then Utils.sleep until next time (all done in nextTuple). Then, link all relevant bolts to that spout using all-grouping, so all of their instances will receive that same signal.

like image 45
Moshe Bixenshpaner Avatar answered Oct 16 '22 02:10

Moshe Bixenshpaner