Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark cluster fails on bigger input, works well for small

I'm playing with Spark. It is the default, pre-built distribution (0.7.0) from the website, with default config, cluster mode, one worker (my localhost). I read the docs on installing and everything seems fine.

I have a CSV file (various sizes, 1000 - 1million rows). If I run my app with small input file (for example the 1000 rows), everything is fine, the program is done in seconds and produces the expected output. But when I supply a bigger file (100.000 rows, or 1million), the execution fails. I tried to dig in the logs, but did not help much (it repeats the whole process about 9-10 times and exitst with fail after that. Also, there is some error related to fetching from some null source failed).

The result Iterable returned by the first JavaRDD is suspicious for me. If I return a hard-coded, singleton list (like res.add("something"); return res;), everything is fine, even with one million rows. But if I add all my keys I want (28 strings of lenght 6-20 chars), the process fails only with the big input. The problem is, I need all these keys, this is the actual business logic.

I'm using Linux amd64, quad core, 8GB ram. Latest Oracle Java7 JDK. Spark config:

SPARK_WORKER_MEMORY=4g
SPARK_MEM=3g
SPARK_CLASSPATH=$SPARK_CLASSPATH:/my/super/application.jar

I must mention that when I start the program, it says:

13/05/30 11:41:52 WARN spark.Utils: Your hostname, *** resolves to a loopback address: 127.0.1.1; using 192.168.1.157 instead (on interface eth1)
13/05/30 11:41:52 WARN spark.Utils: Set SPARK_LOCAL_IP if you need to bind to another address

Here is my program. It is based on the JavaWordCount example, minimally modified.

public final class JavaWordCount
{
    public static void main(final String[] args) throws Exception
    {
        final JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
            System.getenv("SPARK_HOME"), new String[] {"....jar" });

        final JavaRDD<String> words = ctx.textFile(args[1], 1).flatMap(new FlatMapFunction<String, String>() {

            @Override
            public Iterable<String> call(final String s)
            {
                // parsing "s" as the line, computation, building res (it's a List<String>)
                return res;
            }
        });

        final JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {

            @Override
            public Tuple2<String, Integer> call(final String s)
            {
                return new Tuple2<String, Integer>(s, 1);
            }
        });
        final JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {

            @Override
            public Integer call(final Integer i1, final Integer i2)
            {
                return i1 + i2;
            }
        });

        counts.collect();

        for (Tuple2<?, ?> tuple : counts.collect()) {
            System.out.println(tuple._1 + ": " + tuple._2);
        }
    }
}
like image 255
gyorgyabraham Avatar asked May 30 '13 09:05

gyorgyabraham


People also ask

Which of the following can be the downside of adding too many nodes to a Spark cluster?

Increasing the no. of nodes decreases performance.

How do I know if Spark cluster is working?

Click on the HDFS Web UI. A new web page is opened to show the Hadoop DFS (Distributed File System) health status. Click on the Spark Web UI. Another web page is opened showing the spark cluster and job status.

What is the cluster size in Spark?

How large a cluster can Spark scale to? Many organizations run Spark on clusters of thousands of nodes. The largest cluster we know has 8000 of them. In terms of data size, Spark has been shown to work well up to petabytes.


2 Answers

I managed to fix it by setting the property spark.mesos.coarse to true. More info here.

Update: I've been playing around with Spark for a couple of hours. These settings helped me a little, but it seems it's nearly impossible to process ~10million lines of text on a single machine.

System.setProperty("spark.serializer", "spark.KryoSerializer"); // kryo is much faster
System.setProperty("spark.kryoserializer.buffer.mb", "256"); // I serialize bigger objects
System.setProperty("spark.mesos.coarse", "true"); // link provided
System.setProperty("spark.akka.frameSize", "500"); // workers should be able to send bigger messages
System.setProperty("spark.akka.askTimeout", "30"); // high CPU/IO load

Note: Increasing the frame size seems particularly helpful at preventing: org.apache.spark.SparkException: Error communicating with MapOutputTracker

like image 115
gyorgyabraham Avatar answered Oct 15 '22 20:10

gyorgyabraham


in the newer spark version, should use:

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

according to http://spark.apache.org/docs/latest/tuning.html#data-serialization

like image 39
David Wu Avatar answered Oct 15 '22 20:10

David Wu