Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to send output of two different Spout to the same Bolt?

I have two Kafka Spouts whose values I want to send to the same bolt.

Is it possible ?

like image 715
Ritesh Sinha Avatar asked Dec 19 '22 01:12

Ritesh Sinha


1 Answers

Yes it is possible:

TopologyBuilder b = new TopologyBuilder();
b.setSpout("topic_1", new KafkaSpout(...));
b.setSpout("topic_2", new KafkaSpout(...));
b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1").shuffleGrouping("topic_2");

You can use any other grouping, too.

Update:

In order to distinguish tuples (ie, topic_1 or topic_2) in consumer bolt, there are two possibilities:

1) You can use operator IDs (as suggested by @user-4870385):

if(input.getSourceComponent().equalsIgnoreCase("topic_1")) {
    //do something
} else {
    //do something
}

2) You can use stream names (as suggested by @zenbeni). For this case, both spouts need to declare named streams and the bolt need to connect to spouts by stream names:

public class MyKafkaSpout extends KafkaSpout {
  final String streamName;

  public MyKafkaSpout(String stream) {
    this.streamName = stream;
  }

  // other stuff omitted

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    // compare KafkaSpout.declareOutputFields(...)
    declarer.declare(streamName, _spoutConfig.scheme.getOutputFields());
  }
}

Build the topology, stream names need to be used now:

TopologyBuilder b = new TopologyBuilder();
b.setSpout("topic_1", new MyKafkaSpout("stream_t1"));
b.setSpout("topic_2", new MyKafkaSpout("stream_t2"));
b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1", "stream_t1").shuffleGrouping("topic_2", "stream_t2");

In MyBolt the stream name can now be used to distinguish input tuples:

// in my MyBolt.execute():
if(input.getSourceStreamId().equals("Topic1")) {
  // do something
} else {
  // do something
}

Discussion:

While the second approach using stream names is more natural (according to @zenbeni), the first is more flexible (IHMO). Stream names are declared by spout/bolt directly (ie, at the time the spout/bolt code is written); in contrast, operator IDs are assigned when topology is put together (ie, at the time the spout/bolt is used).

Let's assume we get three bolts as class files (no source code). The first two should be used as producers and both declare output streams with the same name. If the third consumer distinguishes input tuples by stream, this will not work. Even if both given producer bolts declare different output stream names, the expected input stream names might be hard coded in consumer bolt and might not match. Thus, it does not work either. However, if the consumer bolt uses component names (even if they are hard coded) to distinguish incoming tuples, the expected component IDs can be assigned correctly.

Of course, it would be possible to inherit from the given classes (if not declared final and overwrite declareOutputFields(...) in order to assign own stream names. However, this is more additional work to do.

like image 125
Matthias J. Sax Avatar answered Feb 23 '23 09:02

Matthias J. Sax