While working on my first Storm Topology that is based on reading Apache file and process it, I am getting weird error when submitting Topology:
6044 [main] WARN backtype.storm.daemon.nimbus - Topology submission exception. (topology name='apachelog') #<InvalidTopologyException InvalidTopologyException(msg:Component: [lineBolt] subscribes from non-existent component [line])>
6051 [main] ERROR org.apache.zookeeper.server.NIOServerCnxn - Thread Thread[main,5,main] died
backtype.storm.generated.InvalidTopologyException: null
at backtype.storm.daemon.common$validate_structure_BANG_.invoke(common.clj:151) ~[storm-core-0.9.0.1.jar:na]
at backtype.storm.daemon.common$system_topology_BANG_.invoke(common.clj:287) ~[storm-core-0.9.0.1.jar:na]
at backtype.storm.daemon.nimbus$fn__5528$exec_fn__1229__auto__$reify__5541.submitTopologyWithOpts(nimbus.clj:932) ~[storm-core-0.9.0.1.jar:na]
at backtype.storm.daemon.nimbus$fn__5528$exec_fn__1229__auto__$reify__5541.submitTopology(nimbus.clj:950) ~[storm-core-0.9.0.1.jar:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.6.0_65]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) ~[na:1.6.0_65]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) ~[na:1.6.0_65]
at java.lang.reflect.Method.invoke(Method.java:597) ~[na:1.6.0_65]
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.4.0.jar:na]
at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) ~[clojure-1.4.0.jar:na]
at backtype.storm.testing$submit_local_topology.invoke(testing.clj:236) ~[storm-core-0.9.0.1.jar:na]
at backtype.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:19) ~[storm-core-0.9.0.1.jar:na]
at backtype.storm.LocalCluster.submitTopology(Unknown Source) ~[storm-core-0.9.0.1.jar:na]
at storm.starter.ApacheAccessLogTopology.main(ApacheAccessLogTopology.java:68) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.6.0_65]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) ~[na:1.6.0_65]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) ~[na:1.6.0_65]
at java.lang.reflect.Method.invoke(Method.java:597) ~[na:1.6.0_65]
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120) ~[idea_rt.jar:na]
My Sprout file
public class LogReaderSpout extends BaseRichSpout
{
private SpoutOutputCollector _collector;
private String filePath = "access_log";
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
{
_collector = collector;
}
@Override
public void nextTuple()
{
Utils.sleep(100);
String line = null;
try
{
BufferedReader bufferReader = new BufferedReader(new FileReader(filePath));
while((line = bufferReader.readLine()) != null)
{
if(line!=null)
{
System.out.println(line);
_collector.emit(new Values(line));
}
}
}
catch (Exception e)
{
e.printStackTrace();
}
System.out.println("Emitting Next Tuple..");
}
@Override
public void ack(Object id)
{
System.out.println("Ack with ID: "+id);
}
@Override
public void fail(Object id)
{
System.out.println("Fail with ID: "+id);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
}
Topology
package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import storm.starter.spout.LogReaderSpout;
import java.util.Map;
public class ApacheAccessLogTopology
{
public static class LineBolt extends BaseRichBolt
{
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector)
{
_collector = collector;
}
@Override
public void execute(Tuple tuple)
{
//ALL PROCESSING will take place here on tuple(in our case Line here)
_collector.emit(tuple, new Values(tuple.getString(0) + "???"));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
declarer.declare(new Fields("line"));
}
}
public static void main(String[] args) throws Exception
{
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("lineSpout", new LogReaderSpout(),2);
builder.setBolt("lineBolt", new LineBolt(),2).shuffleGrouping("line");
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(2);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}
else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("apachelog", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("apachelog");
cluster.shutdown();
}
}
}
Your topology definition isn't valid. You should have:
builder.setBolt("lineBolt", new LineBolt(),2).shuffleGrouping("lineSpout");
Faced same problem as above and below is my exception
Topology submission exception. (topology name='Getting-Started-Toplogie') <InvalidTopologyException InvalidTopologyException(msg:Component: [Record-normalizertt] subscribes from non-existent stream: [default] of component [Rreader])>
And here is my solution this worked to me.
Declare below statement in method declareOutputFields(OutputFieldsDeclarer declarer) in your spout or bolt class.
declarer.declare(new Fields("line"));
Finally your method should be as below in your spout or bolt class.
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("line"));
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With