I have started using storm, so I create simple topology using this tutorial
When I run my topology with LocalCluster
and all seem fine,
My Problem is that I'm not getting ACK on the tuple, meaning my spout ack
is never called.
my code is below - do you know why ack
is not called ?
so my topology look like this
public StormTopology build() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(HelloWorldSpout.class.getSimpleName(),
helloWorldSpout, spoutParallelism);
HelloWorldBolt bolt = new HelloWorldBolt();
builder.setBolt(HelloWorldBolt.class.getSimpleName(),
bolt, boltParallelism)
.shuffleGrouping(HelloWorldSpout.class.getSimpleName());
}
My Spout look like this
public class HelloWorldSpout extends BaseRichSpout implements ISpout {
private SpoutOutputCollector collector;
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("int"));
}
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
}
private static Boolean flag = false;
public void nextTuple() {
Utils.sleep(5000);
//emit only 1 tuple - for testing
if (!flag){
this.collector.emit(new Values(6));
flag = true;
}
}
@Override
public void ack(Object msgId) {
System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
}
public void fail(Object msgId){
System.out.println("[HelloWorldSpout] fail on msgId" + msgId);
}
}
and my bolt look like this
@SuppressWarnings("serial")
public class HelloWorldBolt extends BaseRichBolt{
private OutputCollector collector;
public void prepare(Map conf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
logger.info("preparing HelloWorldBolt");
}
public void execute(Tuple tuple) {
System.out.println("[HelloWorldBolt] got" + tuple.getInteger(0));
this.collector.ack(tuple);
}
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
}
Your emit() method in the spout has only one argument, so that tuple isn't anchored. That's why you're not getting a call back to the ack() method in the spout even though you're ack'ing the tuple in the bolt.
To get this to work, you need to modify your spout to emit a second argument which is the message id. It is this id that's passed back to the ack() method in the spout:
public void nextTuple() {
Utils.sleep(5000);
//emit only 1 tuple - for testing
if (!flag){
Object msgId = "ID 6"; // this can be any object
this.collector.emit(new Values(6), msgId);
flag = true;
}
}
@Override
public void ack(Object msgId) {
// msgId should be "ID 6"
System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With