Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

stop processing tuple in certain bolt

Tags:

apache-storm

I have a topology for example composed of 1 spout and 4 bolts

spout A -> bolt B -> bolt C -> bolt E
                  -> bolt D

Only if some conditional statement in bolt B is true then it passes a tuple to bolt C and bolt D.

And only if some conditional statement in bolt C is true then it passes a tuple to bolt E.

So single tuple may reach only bolt B or (bolt C and D).

I'm using BaseBasicBolt which to my knowledge it acks automatically after collector.emit is called.

For example execute method in bolt B is like below

public class boltB extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        ...some logic goes here
        if (response.getCount() > 0) {
            collector.emit(new Values(tuple.getString(0)));
        }
    }
}

So if collector.emit is not called, I think tuple from spout is failed because I see from storm ui that almost all of tuples from spout is failed.

In this case, where should I call 'ack' for spout not to consider it as failed tuple?

like image 283
Adrian Seungjin Lee Avatar asked Jun 17 '14 05:06

Adrian Seungjin Lee


2 Answers

What you are doing is correct for the logic you are implementing. You do not need to explicitly call ack(). When using BaseBasicBolt, each tuple are acked after the execute() method by BasicBoltExecutor. For the failed tuples, you should check for exceptions. Also try looking at Storm UI for anomalies in tuple emitted/executed/failed for each spout and bolt.

like image 78
Ben Tse Avatar answered Sep 23 '22 14:09

Ben Tse


When you have BaseBasicBolt - acking is done for you, even if you are not emitting anything.

The BaseBasicBolt instance is executed in BasicBoltExecutor, whose execute() method is shown below:

public void execute(Tuple input) {
     _collector.setContext(input);
     try {
         _bolt.execute(input, _collector);
         _collector.getOutputter().ack(input);
     } catch(FailedException e) {
         if(e instanceof ReportedFailedException) {
             _collector.reportError(e);
         }
         _collector.getOutputter().fail(input);
     }
}

So in order to stop processing a tuple , just don't emit, after the execute call, it will be acked. Since there no more bolts to run the ack call back in the spout will be called

Hope it's answer your questions

like image 23
Mzf Avatar answered Sep 20 '22 14:09

Mzf