I'm trying to write a topology that does the following:
In reality I want to do some more processing on the collection.
I tested it locally and looks like it's working. However, I'm not sure if I've set the groupings on the bolts correctly and if this would work correctly when deployed on an actual storm cluster. I would appreciate if someone can help review this topology and suggest any errors, changes or improvements.
Thanks.
This is what my topology looks like.
builder.setSpout("spout", new TwitterFilterSpout("pittsburgh"));
builder.setBolt("sampleaggregate", new SampleAggregatorBolt())
.shuffleGrouping("spout");
builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate");
Aggregation Bolt
public class SampleAggregatorBolt implements IRichBolt {
protected OutputCollector collector;
protected Tuple currentTuple;
protected Logger log;
/**
* Holds the messages in the bolt till you are ready to send them out
*/
protected List<Status> statusCache;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
log = Logger.getLogger(getClass().getName());
statusCache = new ArrayList<Status>();
}
@Override
public void execute(Tuple tuple) {
currentTuple = tuple;
Status currentStatus = null;
try {
currentStatus = (Status) tuple.getValue(0);
} catch (ClassCastException e) {
}
if (currentStatus != null) {
//add it to the status cache
statusCache.add(currentStatus);
collector.ack(tuple);
//check the size of the status cache and pass it to the next stage if you have enough messages to emit
if (statusCache.size() > 10) {
collector.emit(new Values(statusCache));
}
}
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("tweets"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null; //To change body of implemented methods use File | Settings | File Templates.
}
protected void setupNonSerializableAttributes() {
}
}
Printer Bolt
public class PrinterBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple.size() + " " + tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
}
}
There are eight built-in stream groupings in Storm, and you can implement a custom stream grouping by implementing the CustomStreamGrouping interface: Shuffle grouping: Tuples are randomly distributed across the bolt's tasks in a way such that each bolt is guaranteed to get an equal number of tuples.
Networks of spouts and bolts are packaged into a "topology" which is the top-level abstraction that you submit to Storm clusters for execution. A topology is a graph of stream transformations where each node is a spout or bolt.
close − This method is called when a spout is going to shutdown. declareOutputFields − Declares the output schema of the tuple. fail − Specifies that a specific tuple is not processed and not to be reprocessed.
The basic unit of data that can be processed by a Storm application is called a tuple. Each tuple consists of a predefined list of fields. The value of each field can be a byte, char, integer, long, float, double, Boolean, or byte array.
From what I can see it looks good. The devil's in the details, though. I'm not sure what your aggregator bolt does but if it makes any assumptions about the values being passed to it then you should consider an appropriate fields grouping. This might not make that big of a difference as you're using the default parallelism hint of 1, but should you decide to scale with multiple aggregate bolt instances implicit logic assumptions you make may call for a non-shuffle grouping.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With