What is the easiest way to send output of BoltA and BoltB to BoltC. Do I have to use Joins or is there any simpler solution. A and B have same fields (ts, metric_name, metric_count).
// KafkaSpout --> LogDecoder
builder.setBolt(LOGDECODER_BOLT_ID, logdecoderBolt, 10).shuffleGrouping(KAFKA_SPOUT_ID);
// LogDecoder --> CountBolt
builder.setBolt(COUNT_BOLT_ID, countBolt, 10).shuffleGrouping(LOGDECODER_BOLT_ID);
// LogDecoder --> HttpResCodeCountBolt
builder.setBolt(HTTP_RES_CODE_COUNT_BOLT_ID, http_res_code_count_bolt, 10).shuffleGrouping(LOGDECODER_BOLT_ID);
# And now I want to send CountBolt and HttpResCodeCountBolt output to Aggregator Bolt.
// CountBolt --> AggregatwBolt
builder.setBolt(AGGREGATE_BOLT_ID, aggregateBolt, 5).fieldsGrouping((COUNT_BOLT_ID), new Fields("ts"));
// HttpResCodeCountBolt --> AggregatwBolt
builder.setBolt(AGGREGATE_BOLT_ID, aggregateBolt, 5).fieldsGrouping((HTTP_RES_CODE_COUNT_BOLT_ID), new Fields("ts"));
Is this possible ?
Yes. Just add a stream-id ("stream1" and "stream2" below) to the fieldsGrouping call:
BoltDeclarer bd = builder.setBolt(AGGREGATE_BOLT_ID, aggregateBolt, 5);
bd.fieldsGrouping((COUNT_BOLT_ID), "stream1", new Fields("ts"));
bd.fieldsGrouping((HTTP_RES_CODE_COUNT_BOLT_ID), "stream2", new Fields("ts"));
and then in the execute() method for BoltC you can test to see which stream the tuple came from:
public void execute(Tuple tuple) {
if ("stream1".equals(tuple.getSourceStreamId())) {
// this came from stream1
} else if ("stream2".equals(tuple.getSourceStreamId())) {
// this came from stream2
}
Since you know which stream the tuple came from, you don't need to have the same shape of tuple on the two streams. You just de-marshall the tuple according to the stream-id.
You can also check to see which component the tuple came from (as I type this I think this might be more appropriate to your case) as well as the instance of the component (the task) that emitted the tuple.
As @Chris said you can use streams. But you can also simply get the source component from the tuple.
@Override
public final void execute(final Tuple tuple) {
final String sourceComponent = tuple.getSourceComponent();
....
}
The source component is the name you gave to the Bolt at the topology's initialization. For instance: COUNT_BOLT_ID.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With