Executing Sample Flink Program in Local

I am trying to execute a sample program in Apache Flink in local mode.

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class WordCountExample {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> text = env.fromElements(
            "Who's there?",
            "I think I hear them. Stand, ho! Who's there?");
        //DataSet<String> text1 = env.readTextFile(args[0]);

        DataSet<Tuple2<String, Integer>> wordCounts = text
            .flatMap(new LineSplitter())


        env.execute("Word Count Example");

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));

It is giving me exception :

    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/mapreduce/InputFormat
    at WordCountExample.main(WordCountExample.java:10)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapreduce.InputFormat
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    ... 1 more

What am I doing wrong?

I have used the correct jars also. flink-java-0.9.0-milestone-1.jar flink-clients-0.9.0-milestone-1.jar flink-core-0.9.0-milestone-1.jar

2 Answers

Adding the three Flink Jar files as dependencies in your project is not enough because they have other transitive dependencies, for example on Hadoop.

The easiest way to get a working setup to develop (and locally execute) Flink programs is to follow the quickstart guide which uses a Maven archetype to configure a Maven project. This Maven project can be imported into your IDE.

NoClassDefFoundError extends LinkageError

Thrown if the Java Virtual Machine or a ClassLoader instance tries to load in the definition of a class (as part of a normal method call or as part of creating a new instance using the new expression) and no definition of the class could be found. The searched-for class definition existed when the currently executing class was compiled, but the definition can no longer be found.

Your code/jar dependent to hadoop. Found it here download jar file and add it in your classpath org.apache.hadoop.mapreduce.InputFormat

