I am newbie to Storm and have created a program to read the incremented numbers for certain time. I have used a counter in Spout and in the "nextTuple()" method the counter is being emitted and incremented
_collector.emit(new Values(new Integer(currentNumber++)));
/* how this method is being continuously called...*/
and in the execute() method of the Tuple class has
public void execute(Tuple input) {
int number = input.getInteger(0);
logger.info("This number is (" + number + ")");
_outputCollector.ack(input);
}
/*this part I am clear as Bolt would receive the input from Spout*/
In my Main class execution I have the following code
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("NumberSpout", new NumberSpout());
builder.setBolt("NumberBolt", new PrimeNumberBolt())
.shuffleGrouping("NumberSpout");
Config config = new Config();
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("NumberTest", config, builder.createTopology());
Utils.sleep(10000);
localCluster.killTopology("NumberTest");
localCluster.shutdown();
The programs Perfectly works fine. What currently I am looking here is how does the Storm framework internally calls the nextTuple() method continuously. I am sure that my understanding is missing something here and due to this gap I am unable to connect to the internal logic of this framework.
Can anyone of you guys help me in understanding this portion clearly then it would be a great help for me as I will have to implement this concept in my project. If I am conceptually clear here then I can make a significant progress. Appreciate if anyone can quickly assist me over here. Awaiting responses...
Spout emits the data to one or more bolts. Bolt represents a node in the topology having the smallest processing logic and the output of a bolt can be emitted into another bolt as input. Storm keeps the topology always running, until you kill the topology.
Shuffle grouping: Tuples are randomly distributed across the bolt's tasks in a way such that each bolt is guaranteed to get an equal number of tuples. Fields grouping: The stream is partitioned by the fields specified in the grouping.
Apache Storm works for real-time data just as Hadoop works for batch processing of data (Batch processing is the opposite of real-time. In this, data is divided into batches, and each batch is processed. This isn't done in real-time.)
There are just three abstractions in Apache Storm: spouts, bolts, and topologies. A spout is a source of streams in a computation. Typically a spout reads from a queueing broker such as Kestrel, RabbitMQ, or Kafka, but a spout can also generate its own stream or read from somewhere like the Twitter streaming API.
how does the Storm framework internally calls the nextTuple() method continuously.
I believe this actually involves a very detail discussion about the entire life cycle of a storm topology as well as a clear concepts of different entities like workers, executors, tasks etc. The actual processing of a topology is carried out by the StormSubmitter
class with its submitTopology
method.
The very first thing it does is start uploading the jar using Nimbus's Thrift interface and then calls the submitTopology which eventually submit the topology to Nimbus.
The Nimbus then start by normalizing the topology (from doc: The main purpose of normalization is to ensure that every single task will have the same serialization registrations, which is critical for getting serialization working correctly) followed by serialization, zookeeper hand shaking , supervisor and worker process startup and so on. Its too broad to discuss but If you really want to dig more you can go through the life cycle of storm topology where it explain nicely the step by step actions performs during the entire time.
( quick note from the documentation)
First a couple of important notes about topologies:
The actual topology that runs is different than the topology the user specifies. The actual topology has implicit streams and an implicit "acker" bolt added to manage the acking framework (used to guarantee data processing).
The implicit topology is created via the system-topology! function. system-topology! is used in two places:
- - when Nimbus is creating tasks for the topology code
- - in the worker so it knows where it needs to route messages to code
Now here's few clue I could try to share ...
Spouts or Bolts are actually the components which does the real processing (the logic). In storm terminology they executes as many tasks across the structure.
From the doc page : Each task corresponds to one thread of execution
Now, among many others, one typical responsibility of a worker process
(read here) in storm is to monitor weather a topology is active or not and stored that particular state in a variable named storm-active-atom
. This variable is used by the tasks to determine whether or not to call the nextTuple
method.. So as long as your topology is live (you haven't put your spout code but assuming) till the time your timer is active (as you said for certain time) it will keep calling the nextTuple method. You can dig even further to understand the storm's Acking framework implementation to understand how it understand and acknowledge once a tuple is successfully processed and Guarantee-message-processing
I am sure that my understanding is missing something here and due to this gap I am unable to connect to the internal logic of this framework
Having said this I think its more important to get a clear understanding of how to work with storm rather than how to understand storm in the early stage. e.g instead of learning the internal mechanism of storm its important to realize that if we set a spout to read a file line by line then it keep on emitting each lines using the _collector.emit
method till it reaches EOF. And the bolt connected to it receive the same in its execute(tuple input)
method
Hope this help you share more with us in future
There is a loop in the storm's executor
daemon that repeatedly calls nextTuple
(as well as ack
and fail
when appropriate) on the corresponding spout
instance.
There is no waiting for tuples being processed. Spout simply receives fail
for tuples that did not manage to be processed in given timeout.
This can be easily simulated with a topology of a fast spout and a slow processing bolt: the spout will receive a lot of fail
calls.
See also the ISpout javadoc:
nextTuple, ack, and fail are all called in a tight loop in a single thread in the spout task. When there are no tuples to emit, it is courteous to have nextTuple sleep for a short amount of time (like a single millisecond) so as not to waste too much CPU.
The situation is completely different for Trident-spouts:
By default, Trident processes a single batch at a time, waiting for the batch to succeed or fail before trying another batch. You can get significantly higher throughput – and lower latency of processing of each batch – by pipelining the batches. You configure the maximum amount of batches to be processed simultaneously with the
topology.max.spout.pending
property.Even while processing multiple batches simultaneously, Trident will order any state updates taking place in the topology among batches.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With