Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does the Storm handle nextTuple in the Bolt

I am newbie to Storm and have created a program to read the incremented numbers for certain time. I have used a counter in Spout and in the "nextTuple()" method the counter is being emitted and incremented

_collector.emit(new Values(new Integer(currentNumber++))); 
/* how this method is being continuously called...*/

and in the execute() method of the Tuple class has

public void execute(Tuple input) {
int number = input.getInteger(0);
logger.info("This number is (" + number + ")");
_outputCollector.ack(input);
}
/*this part I am clear as Bolt would receive the input from Spout*/

In my Main class execution I have the following code

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("NumberSpout", new NumberSpout());
builder.setBolt("NumberBolt", new PrimeNumberBolt())
            .shuffleGrouping("NumberSpout");
Config config = new Config();
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("NumberTest", config, builder.createTopology());
Utils.sleep(10000);
localCluster.killTopology("NumberTest");
localCluster.shutdown();

The programs Perfectly works fine. What currently I am looking here is how does the Storm framework internally calls the nextTuple() method continuously. I am sure that my understanding is missing something here and due to this gap I am unable to connect to the internal logic of this framework.

Can anyone of you guys help me in understanding this portion clearly then it would be a great help for me as I will have to implement this concept in my project. If I am conceptually clear here then I can make a significant progress. Appreciate if anyone can quickly assist me over here. Awaiting responses...

like image 900
JavaPassion Avatar asked Dec 05 '13 19:12

JavaPassion


People also ask

What is Storm spout and bolt?

Spout emits the data to one or more bolts. Bolt represents a node in the topology having the smallest processing logic and the output of a bolt can be emitted into another bolt as input. Storm keeps the topology always running, until you kill the topology.

What is Shufflegrouping in Storm?

Shuffle grouping: Tuples are randomly distributed across the bolt's tasks in a way such that each bolt is guaranteed to get an equal number of tuples. Fields grouping: The stream is partitioned by the fields specified in the grouping.

How does Apache storm work?

Apache Storm works for real-time data just as Hadoop works for batch processing of data (Batch processing is the opposite of real-time. In this, data is divided into batches, and each batch is processed. This isn't done in real-time.)

What is spout in Storm?

There are just three abstractions in Apache Storm: spouts, bolts, and topologies. A spout is a source of streams in a computation. Typically a spout reads from a queueing broker such as Kestrel, RabbitMQ, or Kafka, but a spout can also generate its own stream or read from somewhere like the Twitter streaming API.


2 Answers

how does the Storm framework internally calls the nextTuple() method continuously.

I believe this actually involves a very detail discussion about the entire life cycle of a storm topology as well as a clear concepts of different entities like workers, executors, tasks etc. The actual processing of a topology is carried out by the StormSubmitter class with its submitTopology method.

The very first thing it does is start uploading the jar using Nimbus's Thrift interface and then calls the submitTopology which eventually submit the topology to Nimbus.

The Nimbus then start by normalizing the topology (from doc: The main purpose of normalization is to ensure that every single task will have the same serialization registrations, which is critical for getting serialization working correctly) followed by serialization, zookeeper hand shaking , supervisor and worker process startup and so on. Its too broad to discuss but If you really want to dig more you can go through the life cycle of storm topology where it explain nicely the step by step actions performs during the entire time.
( quick note from the documentation)

First a couple of important notes about topologies:

The actual topology that runs is different than the topology the user specifies. The actual topology has implicit streams and an implicit "acker" bolt added to manage the acking framework (used to guarantee data processing).

The implicit topology is created via the system-topology! function. system-topology! is used in two places:
- - when Nimbus is creating tasks for the topology code
- - in the worker so it knows where it needs to route messages to code

Now here's few clue I could try to share ...
Spouts or Bolts are actually the components which does the real processing (the logic). In storm terminology they executes as many tasks across the structure.
From the doc page : Each task corresponds to one thread of execution

Now, among many others, one typical responsibility of a worker process (read here) in storm is to monitor weather a topology is active or not and stored that particular state in a variable named storm-active-atom. This variable is used by the tasks to determine whether or not to call the nextTuple method.. So as long as your topology is live (you haven't put your spout code but assuming) till the time your timer is active (as you said for certain time) it will keep calling the nextTuple method. You can dig even further to understand the storm's Acking framework implementation to understand how it understand and acknowledge once a tuple is successfully processed and Guarantee-message-processing

I am sure that my understanding is missing something here and due to this gap I am unable to connect to the internal logic of this framework

Having said this I think its more important to get a clear understanding of how to work with storm rather than how to understand storm in the early stage. e.g instead of learning the internal mechanism of storm its important to realize that if we set a spout to read a file line by line then it keep on emitting each lines using the _collector.emit method till it reaches EOF. And the bolt connected to it receive the same in its execute(tuple input) method

Hope this help you share more with us in future

like image 110
user2720864 Avatar answered Oct 31 '22 20:10

user2720864


Ordinary Spouts

There is a loop in the storm's executor daemon that repeatedly calls nextTuple (as well as ack and fail when appropriate) on the corresponding spout instance.

There is no waiting for tuples being processed. Spout simply receives fail for tuples that did not manage to be processed in given timeout. This can be easily simulated with a topology of a fast spout and a slow processing bolt: the spout will receive a lot of fail calls.

See also the ISpout javadoc:

nextTuple, ack, and fail are all called in a tight loop in a single thread in the spout task. When there are no tuples to emit, it is courteous to have nextTuple sleep for a short amount of time (like a single millisecond) so as not to waste too much CPU.


Trident Spouts

The situation is completely different for Trident-spouts:

By default, Trident processes a single batch at a time, waiting for the batch to succeed or fail before trying another batch. You can get significantly higher throughput – and lower latency of processing of each batch – by pipelining the batches. You configure the maximum amount of batches to be processed simultaneously with the topology.max.spout.pending property.

Even while processing multiple batches simultaneously, Trident will order any state updates taking place in the topology among batches.

like image 30
dedek Avatar answered Oct 31 '22 20:10

dedek