Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to set TOPOLOGY_MAX_SPOUT_PENDING parameter

In my topology, I read trigger messages from a Kafka queue. On receiving the trigger message, I need to emit around 4096 messages to a bolt. In the bolt, after some processing it will publish to another Kafka queue (another topology will consume this later).

I'm trying to set TOPOLOGY_MAX_SPOUT_PENDING parameter to throttle the number of messages going to bolt. But I see it is having no effect. Is it because I'm emitting all the tuples in one nextTuple() method? If so, what should be the work around?

like image 247
user2989124 Avatar asked Sep 12 '25 06:09

user2989124


2 Answers

If you are reading from kafka, you should use the KafkaSpout that comes packed with storm. Don't try to implement your own spout, trust me, I use the KafkaSpout in production and it works very smoothly. Each Kafka message generates exactly one tuple.

And as you can see on this nice page from the manual, you can set the topology.max.spout.pending like this:

Config conf = new Config();
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("mytopology", conf, topology);

The topology.max.spout.pending is set per spout, if you have four spouts you will have a maximum of non-complete tuples inside your topology equal to the number of spouts * topology.max.spout.pending.

Another tip, is that you should use the storm UI to see if the topology.max.spout.pending was set properly.


Remember the topology.max.spout.pending is only the number of tuples not unprocessed inside the topology, the topology will never stop consume messages from kafka, at least on a production system... If you want to consume batches of 4096 you need to implement caching logic on your bolts, or use something else than storm (something micro batch oriented).

like image 92
SQL.injection Avatar answered Sep 13 '25 19:09

SQL.injection


To make TOPOLOGY_MAX_SPOUT_PENDING you need to enable fault-tolerance mechanism (ie, assigning message IDs in Spouts and anchor and ack in Bolts). Furthermore, if you emit more than one tuple per call to Spout.nextTuple() TOPOLOGY_MAX_SPOUT_PENDING will not work as expected.

It is actually bad practice for some more reasons so emit more than a single tuple per Spout.nextTuple() call (see Why should I not loop or block in Spout.nextTuple() for more details).

like image 36
Matthias J. Sax Avatar answered Sep 13 '25 19:09

Matthias J. Sax