I changed the class WordCount
in WordCountTopology
as follows:
public static class WordCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if(count==null) count = 0;
count++;
counts.put(word, count);
OutputStream o;
try {
o = new FileOutputStream("~/abc.txt", true);
o.write(word.getBytes());
o.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
in which I write the word to file abc.txt
.
When I ran the WordCountTopology
in local mode(which used the LocalCluster
), it just works fine. But when running in distributed mode(which used the StormSubmitter.submitTopology()
method), the WordCount
class didn't write the word to abc.txt
as if the execute()
method has not run at all. Could anyone give me some idea? Thanks a lot!
P.S. I'm sure my nimbus, supervisor, ui, zookeeper is running normally, and I can see the task in 127.0.0.1:8080.
The main problem is location of abc.txt file.This file will be created in the system from where you are submitting the topology.So this file wont be available in other cluster machines.You can check supervisors log for file not found error.To resolve this issue you need some NFS configuration through which common location can be shared by all cluster machines.After configuring NFS create new file in common location so that this file wound be available for all supervisors.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With