Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

java.lang.ClassNotFoundException: TopologyMain

I am trying to submit a simple word count topology to my local storm cluster. First, i tried using maven and then using storm command line client. I created the JAR file using eclipse. But, it throws main class not found exception. Can anyone tell me what could be the problem? I am attaching the code and exception below.

package com.test.newpackage;

import com.test.newpackage.WordReader;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import com.test.newpackage.WordCounter;
import com.test.newpackage.WordNormalizer;

public class TopologyMain {
    public static void main(String[] args) throws InterruptedException {
        // Topology definition
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word-reader", new WordReader());
        builder.setBolt("word-normalizer", new WordNormalizer())
                .shuffleGrouping("word-reader");
        builder.setBolt("word-counter", new WordCounter(), 2).fieldsGrouping(
                "word-normalizer", new Fields("word"));
        // Configuration
        Config conf = new Config();
        conf.put("wordsFile", args[0]);
        conf.setDebug(false);
        // Topology run
        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("Getting-Started-Toplogie", conf,
                builder.createTopology());
        Thread.sleep(1000);
        cluster.shutdown();
    }
}

package com.test.newpackage;

import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class WordCounter implements IRichBolt {
    Integer id;
    String name;
    Map<String, Integer> counters;
    private OutputCollector collector;

    /**
     * At the end of the spout (when the cluster is shutdown We will show the
     * word counters
     */
    @Override
    public void cleanup() {
        System.out.println("-- Word Counter [" + name + "-" + id + "] --");
        for (Map.Entry<String, Integer> entry : counters.entrySet()) {
            System.out.println(entry.getKey() + ": " + entry.getValue());
        }
    }

    /**
     * On each word We will count
     */
    @Override
    public void execute(Tuple input) {
        String str = input.getString(0);
        /**
         * If the word dosn't exist in the map we will create this, if not We
         * will add 1
         */
        if (!counters.containsKey(str)) {
            counters.put(str, 1);
        } else {
            Integer c = counters.get(str) + 1;
            counters.put(str, c);
        }
        // Set the tuple as Acknowledge
        collector.ack(input);
    }

    /**
     * On create
     */
    @Override
    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.counters = new HashMap<String, Integer>();
        this.collector = collector;
        this.name = context.getThisComponentId();
        this.id = context.getThisTaskId();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }
}

package com.test.newpackage;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class WordNormalizer implements IRichBolt {
    private OutputCollector collector;

    public void cleanup() {
    }

    /**
     * The bolt will receive the line from the words file and process it to
     * Normalize this line
     * 
     * The normalize will be put the words in lower case and split the line to
     * get all words in this
     */
    public void execute(Tuple input) {
        String sentence = input.getString(0);
        String[] words = sentence.split(" ");
        for (String word : words) {
            word = word.trim();
            if (!word.isEmpty()) {
                word = word.toLowerCase();
                // Emit the word
                List a = new ArrayList();
                a.add(input);
                collector.emit(a, new Values(word));
            }
        }
        // Acknowledge the tuple
        collector.ack(input);
    }

    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
    }

    /**
     * The bolt will only emit the field "word"
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }
}

package com.test.newpackage;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class WordReader implements IRichSpout {
    private SpoutOutputCollector collector;
    private FileReader fileReader;
    private boolean completed = false;
    private TopologyContext context;

    public boolean isDistributed() {
        return false;
    }

    public void ack(Object msgId) {
        System.out.println("OK:" + msgId);
    }

    public void close() {
    }

    public void fail(Object msgId) {
        System.out.println("FAIL:" + msgId);
    }

    /**
     * The only thing that the methods will do It is emit each file line
     */
    public void nextTuple() {
        /**
         * The nextuple it is called forever, so if we have been readed the file
         * we will wait and then return
         */
        if (completed) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // Do nothing
            }
            return;
        }
        String str;
        // Open the reader
        BufferedReader reader = new BufferedReader(fileReader);
        try {
            // Read all lines
            while ((str = reader.readLine()) != null) {
                /**
                 * By each line emmit a new value with the line as a their
                 */
                this.collector.emit(new Values(str), str);
            }
        } catch (Exception e) {
            throw new RuntimeException("Error reading tuple", e);
        } finally {
            completed = true;
        }
    }

    /**
     * We will create the file and get the collector object
     */
    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        try {
            this.context = context;
            this.fileReader = new FileReader(conf.get("wordsFile").toString());
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Error reading file["
                    + conf.get("wordFile") + "]");
        }
        this.collector = collector;
    }

    /**
     * Declare the output field "word"
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("line"));
    }

    @Override
    public void activate() {
        // TODO Auto-generated method stub

    }

    @Override
    public void deactivate() {
        // TODO Auto-generated method stub

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }
}

    mvn exec:java -Dexec.mainClass="TopologyMain" -Dexec.args="resources/
    words.txt"

    storm jar TopologyMain.jar TopologyMain wordcount

     Exception in thread "main" java.lang.NoClassDefFoundError: TopologyMain
        Caused by: java.lang.ClassNotFoundException: TopologyMain
            at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
            at java.security.AccessController.doPrivileged(Native Method)
            at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:321)
            at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:266)
        Could not find the main class: TopologyMain. Program will exit.

EDIT Exception:

 backtype.storm.daemon.nimbus  - Activating word-count: word-count-1-1374756437
1724 [main] ERROR org.apache.zookeeper.server.NIOServerCnxn  - Thread Thread[main,5,main] died
java.lang.NullPointerException
    at clojure.lang.Numbers.ops(Numbers.java:942)
    at clojure.lang.Numbers.isPos(Numbers.java:94)
    at clojure.core$take$fn__4112.invoke(core.clj:2500)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core$concat$fn__3804.invoke(core.clj:662)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core$concat$cat__3806$fn__3807.invoke(core.clj:671)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core$map$fn__4091.invoke(core.clj:2437)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30)
    at clojure.core.protocols$fn__5875.invoke(protocols.clj:54)
    at clojure.core.protocols$fn__5828$G__5823__5841.invoke(protocols.clj:13)
    at clojure.core$reduce.invoke(core.clj:6030)
    at clojure.core$into.invoke(core.clj:6077)
    at backtype.storm.daemon.common$storm_task_info.invoke(common.clj:245)
    at backtype.storm.daemon.nimbus$compute_executors.invoke(nimbus.clj:408)
    at backtype.storm.daemon.nimbus$compute_executor__GT_component.invoke(nimbus.clj:420)
    at backtype.storm.daemon.nimbus$read_topology_details.invoke(nimbus.clj:315)
    at backtype.storm.daemon.nimbus$mk_assignments$iter__3398__3402$fn__3403.invoke(nimbus.clj:626)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30)
    at clojure.core.protocols$fn__5875.invoke(protocols.clj:54)
    at clojure.core.protocols$fn__5828$G__5823__5841.invoke(protocols.clj:13)
    at clojure.core$reduce.invoke(core.clj:6030)
    at clojure.core$into.invoke(core.clj:6077)
    at backtype.storm.daemon.nimbus$mk_assignments.doInvoke(nimbus.clj:625)
    at clojure.lang.RestFn.invoke(RestFn.java:410)
    at backtype.storm.daemon.nimbus$fn__3590$exec_fn__1207__auto__$reify__3603.submitTopology(nimbus.clj:898)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:616)
    at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
    at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28)
    at backtype.storm.testing$submit_local_topology.invoke(testing.clj:227)
    at backtype.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:19)
    at backtype.storm.LocalCluster.submitTopology(Unknown Source)
    at storm.starter.WordCountTopology.main(WordCountTopology.java:83)
11769 [Thread-2] ERROR backtype.storm.daemon.nimbus  - Error when processing event
java.lang.NullPointerException
    at clojure.lang.Numbers.ops(Numbers.java:942)
    at clojure.lang.Numbers.isPos(Numbers.java:94)
    at clojure.core$take$fn__4112.invoke(core.clj:2500)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core$concat$fn__3804.invoke(core.clj:662)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core$concat$cat__3806$fn__3807.invoke(core.clj:671)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core$map$fn__4091.invoke(core.clj:2437)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30)
    at clojure.core.protocols$fn__5875.invoke(protocols.clj:54)
    at clojure.core.protocols$fn__5828$G__5823__5841.invoke(protocols.clj:13)
    at clojure.core$reduce.invoke(core.clj:6030)
    at clojure.core$into.invoke(core.clj:6077)
    at backtype.storm.daemon.common$storm_task_info.invoke(common.clj:245)
    at backtype.storm.daemon.nimbus$compute_executors.invoke(nimbus.clj:408)
    at backtype.storm.daemon.nimbus$compute_executor__GT_component.invoke(nimbus.clj:420)
    at backtype.storm.daemon.nimbus$read_topology_details.invoke(nimbus.clj:315)
    at backtype.storm.daemon.nimbus$mk_assignments$iter__3398__3402$fn__3403.invoke(nimbus.clj:626)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30)
    at clojure.core.protocols$fn__5875.invoke(protocols.clj:54)
    at clojure.core.protocols$fn__5828$G__5823__5841.invoke(protocols.clj:13)
    at clojure.core$reduce.invoke(core.clj:6030)
    at clojure.core$into.invoke(core.clj:6077)
    at backtype.storm.daemon.nimbus$mk_assignments.doInvoke(nimbus.clj:625)
    at clojure.lang.RestFn.invoke(RestFn.java:410)
    at backtype.storm.daemon.nimbus$fn__3590$exec_fn__1207__auto____3591$fn__3596$fn__3597.invoke(nimbus.clj:860)
    at backtype.storm.daemon.nimbus$fn__3590$exec_fn__1207__auto____3591$fn__3596.invoke(nimbus.clj:859)
    at backtype.storm.timer$schedule_recurring$this__1753.invoke(timer.clj:69)
    at backtype.storm.timer$mk_timer$fn__1736$fn__1737.invoke(timer.clj:33)
    at backtype.storm.timer$mk_timer$fn__1736.invoke(timer.clj:26)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Thread.java:679)
11774 [Thread-2] INFO  backtype.storm.util  - Halting process: ("Error when processing an event")
like image 264
Naresh Avatar asked Oct 04 '22 12:10

Naresh


1 Answers

Naresh, based on what I see the solution to your problem may lie in the class name you used. Here is the command line argument you specified:

storm jar TopologyMain.jar TopologyMain wordcount

You refer to your main class as "TopologyMain" using only the class name rather than the fully qualified name. Here is my revision of your storm command line attempt:

storm jar TopologyMain.jar com.test.newpackage.TopologyMain

I use the full package name instead of just the class by itself. Note that I also removed the reference to "wordcount" because I don't know what it's doing there (there are no classes, methods, or variables called "wordcount" in your code sample).

Here is an excellent article on Google groups which covers early setup problems with Storm: early setup question

I found myself using this article for the first couple of weeks when I started using Storm.

Please let us know if this solves your problem.

like image 161
Tim Biegeleisen Avatar answered Oct 07 '22 19:10

Tim Biegeleisen