Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

send output of two bolts to a single bolt in Storm?

Tags:

apache-storm

What is the easiest way to send output of BoltA and BoltB to BoltC. Do I have to use Joins or is there any simpler solution. A and B have same fields (ts, metric_name, metric_count).

    // KafkaSpout --> LogDecoder
    builder.setBolt(LOGDECODER_BOLT_ID, logdecoderBolt, 10).shuffleGrouping(KAFKA_SPOUT_ID);

    // LogDecoder --> CountBolt
    builder.setBolt(COUNT_BOLT_ID, countBolt, 10).shuffleGrouping(LOGDECODER_BOLT_ID);

    // LogDecoder --> HttpResCodeCountBolt
    builder.setBolt(HTTP_RES_CODE_COUNT_BOLT_ID, http_res_code_count_bolt, 10).shuffleGrouping(LOGDECODER_BOLT_ID);


    # And now I want to send CountBolt and HttpResCodeCountBolt output to Aggregator Bolt.

    // CountBolt --> AggregatwBolt
    builder.setBolt(AGGREGATE_BOLT_ID, aggregateBolt, 5).fieldsGrouping((COUNT_BOLT_ID), new Fields("ts"));

    // HttpResCodeCountBolt --> AggregatwBolt
    builder.setBolt(AGGREGATE_BOLT_ID, aggregateBolt, 5).fieldsGrouping((HTTP_RES_CODE_COUNT_BOLT_ID), new Fields("ts"));

Is this possible ?

like image 375
user1579557 Avatar asked May 29 '14 21:05

user1579557


2 Answers

Yes. Just add a stream-id ("stream1" and "stream2" below) to the fieldsGrouping call:

BoltDeclarer bd = builder.setBolt(AGGREGATE_BOLT_ID, aggregateBolt, 5); 
bd.fieldsGrouping((COUNT_BOLT_ID), "stream1",  new Fields("ts"));
bd.fieldsGrouping((HTTP_RES_CODE_COUNT_BOLT_ID), "stream2", new Fields("ts"));

and then in the execute() method for BoltC you can test to see which stream the tuple came from:

public void execute(Tuple tuple) {

    if ("stream1".equals(tuple.getSourceStreamId())) {
        // this came from stream1
    } else if ("stream2".equals(tuple.getSourceStreamId())) {
        // this came from stream2
    }

Since you know which stream the tuple came from, you don't need to have the same shape of tuple on the two streams. You just de-marshall the tuple according to the stream-id.

You can also check to see which component the tuple came from (as I type this I think this might be more appropriate to your case) as well as the instance of the component (the task) that emitted the tuple.

like image 158
Chris Gerken Avatar answered Sep 21 '22 07:09

Chris Gerken


As @Chris said you can use streams. But you can also simply get the source component from the tuple.

@Override
public final void execute(final Tuple tuple) {
    final String sourceComponent = tuple.getSourceComponent();
    ....
}

The source component is the name you gave to the Bolt at the topology's initialization. For instance: COUNT_BOLT_ID.

like image 41
zenbeni Avatar answered Sep 18 '22 07:09

zenbeni