In my topology, I read trigger messages from a Kafka queue. On receiving the trigger message, I need to emit around 4096 messages to a bolt. In the bolt, after some processing it will publish to another Kafka queue (another topology will consume this later).
I'm trying to set TOPOLOGY_MAX_SPOUT_PENDING
parameter to throttle the number of messages going to bolt. But I see it is having no effect. Is it because I'm emitting all the tuples in one nextTuple()
method? If so, what should be the work around?
If you are reading from kafka, you should use the KafkaSpout
that comes packed with storm. Don't try to implement your own spout, trust me, I use the KafkaSpout in production and it works very smoothly. Each Kafka message generates exactly one tuple.
And as you can see on this nice page from the manual, you can set the topology.max.spout.pending
like this:
Config conf = new Config();
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("mytopology", conf, topology);
The topology.max.spout.pending
is set per spout, if you have four spouts you will have a maximum of non-complete tuples inside your topology equal to the number of spouts * topology.max.spout.pending.
Another tip, is that you should use the storm UI to see if the topology.max.spout.pending
was set properly.
Remember the topology.max.spout.pending
is only the number of tuples not unprocessed inside the topology, the topology will never stop consume messages from kafka, at least on a production system... If you want to consume batches of 4096 you need to implement caching logic on your bolts, or use something else than storm (something micro batch oriented).
To make TOPOLOGY_MAX_SPOUT_PENDING you need to enable fault-tolerance mechanism (ie, assigning message IDs in Spouts and anchor and ack in Bolts). Furthermore, if you emit more than one tuple per call to Spout.nextTuple()
TOPOLOGY_MAX_SPOUT_PENDING will not work as expected.
It is actually bad practice for some more reasons so emit more than a single tuple per Spout.nextTuple()
call (see Why should I not loop or block in Spout.nextTuple() for more details).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With