Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to send a tuple to different bolt according to a value in the message

I have a Storm cluster connecting to Kinesis Stream. The message looks like this.

{
    _c: "a"
}

or it should be

{
    _c: "b"
}

I would like to send a tuple with _c="a" to one bolt and _c="b" to a different bolt. How do I achieve this?

This is the bolt that parsing the message from Kinesis to JSON Object using GSon

@Override
public void execute(Tuple tuple) {
  String partitionKey = (String) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_PARTITION_KEY);
  String sequenceNumber = (String) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER);
  byte[] payload = (byte[]) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_RECORD_DATA);

  ByteBuffer buffer = ByteBuffer.wrap(payload);
  String data = null;
  try {
    data = decoder.decode(buffer).toString();

    HashMap < String, String > map = new Gson().fromJson(data, new TypeToken < HashMap < String, Object >> () {}.getType());

    this.outputCollector.emit(tuple, new Values(map));
    this.outputCollector.ack(tuple);

  } catch (CharacterCodingException e) {
    this.outputCollector.fail(tuple);
  }

}

Thanks

like image 547
toy Avatar asked Nov 16 '25 06:11

toy


1 Answers

You can define two streams in your bolt and then declare two outputstreams :

@Override
public void execute(Tuple tuple) {
    // ...
    // Some Code
    // ...
    if (_c =="a") {
    collector.emit("stream1", tuple, new Values(_c));
    } else {
    collector.emit("stream2", tuple, new Values(_c));
    }

}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declareStream("stream1", new Fields("_c"));
    outputFieldsDeclarer.declareStream("stream2", new Fields("_c"));
} }

In your topology you can then use the option in ShuffleGrouping to pass a Stream_id.

topology.setBolt("FirstBolt",new FirstBolt(),1);    
topology.setBolt("newBolt1", new Custombolt(),1).shuffleGrouping("FirstBolt", "stream1");
topology.setBolt("newBolt2", new Custombolt(),1).shuffleGrouping("FirstBolt", "stream2");

Another possibility is to just send it to both bolts and then check the value in both bolts and execute the required code.

like image 161
SebastienPattyn Avatar answered Nov 18 '25 20:11

SebastienPattyn



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!