Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Invalid Topology Exception error while submitting a topology

While working on my first Storm Topology that is based on reading Apache file and process it, I am getting weird error when submitting Topology:

6044 [main] WARN  backtype.storm.daemon.nimbus - Topology submission exception. (topology name='apachelog') #<InvalidTopologyException InvalidTopologyException(msg:Component: [lineBolt] subscribes from non-existent component [line])>
6051 [main] ERROR org.apache.zookeeper.server.NIOServerCnxn - Thread Thread[main,5,main] died
backtype.storm.generated.InvalidTopologyException: null
    at backtype.storm.daemon.common$validate_structure_BANG_.invoke(common.clj:151) ~[storm-core-0.9.0.1.jar:na]
    at backtype.storm.daemon.common$system_topology_BANG_.invoke(common.clj:287) ~[storm-core-0.9.0.1.jar:na]
    at backtype.storm.daemon.nimbus$fn__5528$exec_fn__1229__auto__$reify__5541.submitTopologyWithOpts(nimbus.clj:932) ~[storm-core-0.9.0.1.jar:na]
    at backtype.storm.daemon.nimbus$fn__5528$exec_fn__1229__auto__$reify__5541.submitTopology(nimbus.clj:950) ~[storm-core-0.9.0.1.jar:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.6.0_65]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) ~[na:1.6.0_65]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) ~[na:1.6.0_65]
    at java.lang.reflect.Method.invoke(Method.java:597) ~[na:1.6.0_65]
    at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.4.0.jar:na]
    at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) ~[clojure-1.4.0.jar:na]
    at backtype.storm.testing$submit_local_topology.invoke(testing.clj:236) ~[storm-core-0.9.0.1.jar:na]
    at backtype.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:19) ~[storm-core-0.9.0.1.jar:na]
    at backtype.storm.LocalCluster.submitTopology(Unknown Source) ~[storm-core-0.9.0.1.jar:na]
    at storm.starter.ApacheAccessLogTopology.main(ApacheAccessLogTopology.java:68) ~[classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.6.0_65]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) ~[na:1.6.0_65]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) ~[na:1.6.0_65]
    at java.lang.reflect.Method.invoke(Method.java:597) ~[na:1.6.0_65]
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120) ~[idea_rt.jar:na]

My Sprout file

public class LogReaderSpout extends BaseRichSpout
{
    private SpoutOutputCollector _collector;
    private String filePath = "access_log";

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
    {
        _collector = collector;
    }

    @Override
    public void nextTuple()
    {
        Utils.sleep(100);
        String line = null;
        try
        {
            BufferedReader bufferReader = new BufferedReader(new FileReader(filePath));
            while((line = bufferReader.readLine()) != null)
            {
                if(line!=null)
                {
                    System.out.println(line);
                    _collector.emit(new Values(line));
                }
            }

        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        System.out.println("Emitting Next Tuple..");
    }

    @Override
    public void ack(Object id)
    {
        System.out.println("Ack with ID: "+id);
    }

    @Override
    public void fail(Object id)
    {
        System.out.println("Fail with ID: "+id);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("line"));
    }
}

Topology

package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import storm.starter.spout.LogReaderSpout;

import java.util.Map;

public class ApacheAccessLogTopology
{
    public static class LineBolt extends BaseRichBolt
    {
        OutputCollector _collector;

        @Override
        public void prepare(Map conf, TopologyContext context, OutputCollector collector)
        {
            _collector = collector;
        }

        @Override
        public void execute(Tuple tuple)
        {
            //ALL PROCESSING will take place here on tuple(in our case Line here)
            _collector.emit(tuple, new Values(tuple.getString(0) + "???"));
            _collector.ack(tuple);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer)
        {
            declarer.declare(new Fields("line"));
        }


    }

    public static void main(String[] args) throws Exception
    {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("lineSpout", new LogReaderSpout(),2);
        builder.setBolt("lineBolt", new LineBolt(),2).shuffleGrouping("line");

        Config conf = new Config();
        conf.setDebug(true);

        if (args != null && args.length > 0) {
            conf.setNumWorkers(2);

            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        }
        else {

            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("apachelog", conf, builder.createTopology());
            Utils.sleep(10000);
            cluster.killTopology("apachelog");
            cluster.shutdown();
        }
    }

}
like image 792
Volatil3 Avatar asked Jan 21 '14 20:01

Volatil3


2 Answers

Your topology definition isn't valid. You should have:

builder.setBolt("lineBolt", new LineBolt(),2).shuffleGrouping("lineSpout");
like image 178
Chiron Avatar answered Oct 31 '22 11:10

Chiron


Faced same problem as above and below is my exception

Topology submission exception. (topology name='Getting-Started-Toplogie') <InvalidTopologyException InvalidTopologyException(msg:Component: [Record-normalizertt] subscribes from non-existent stream: [default] of component [Rreader])>

And here is my solution this worked to me.

Declare below statement in method declareOutputFields(OutputFieldsDeclarer declarer) in your spout or bolt class.

declarer.declare(new Fields("line"));

Finally your method should be as below in your spout or bolt class.

public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
    declarer.declare(new Fields("line"));

    }
like image 42
Vasanth Kumar Avatar answered Oct 31 '22 11:10

Vasanth Kumar