I'm playing with Spark. It is the default, pre-built distribution (0.7.0) from the website, with default config, cluster mode, one worker (my localhost). I read the docs on installing and everything seems fine.
I have a CSV file (various sizes, 1000 - 1million rows). If I run my app with small input file (for example the 1000 rows), everything is fine, the program is done in seconds and produces the expected output. But when I supply a bigger file (100.000 rows, or 1million), the execution fails. I tried to dig in the logs, but did not help much (it repeats the whole process about 9-10 times and exitst with fail after that. Also, there is some error related to fetching from some null source failed).
The result Iterable returned by the first JavaRDD is suspicious for me. If I return a hard-coded, singleton list (like res.add("something"); return res;), everything is fine, even with one million rows. But if I add all my keys I want (28 strings of lenght 6-20 chars), the process fails only with the big input. The problem is, I need all these keys, this is the actual business logic.
I'm using Linux amd64, quad core, 8GB ram. Latest Oracle Java7 JDK. Spark config:
SPARK_WORKER_MEMORY=4g
SPARK_MEM=3g
SPARK_CLASSPATH=$SPARK_CLASSPATH:/my/super/application.jar
I must mention that when I start the program, it says:
13/05/30 11:41:52 WARN spark.Utils: Your hostname, *** resolves to a loopback address: 127.0.1.1; using 192.168.1.157 instead (on interface eth1)
13/05/30 11:41:52 WARN spark.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Here is my program. It is based on the JavaWordCount example, minimally modified.
public final class JavaWordCount
{
public static void main(final String[] args) throws Exception
{
final JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
System.getenv("SPARK_HOME"), new String[] {"....jar" });
final JavaRDD<String> words = ctx.textFile(args[1], 1).flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(final String s)
{
// parsing "s" as the line, computation, building res (it's a List<String>)
return res;
}
});
final JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(final String s)
{
return new Tuple2<String, Integer>(s, 1);
}
});
final JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(final Integer i1, final Integer i2)
{
return i1 + i2;
}
});
counts.collect();
for (Tuple2<?, ?> tuple : counts.collect()) {
System.out.println(tuple._1 + ": " + tuple._2);
}
}
}
Increasing the no. of nodes decreases performance.
Click on the HDFS Web UI. A new web page is opened to show the Hadoop DFS (Distributed File System) health status. Click on the Spark Web UI. Another web page is opened showing the spark cluster and job status.
How large a cluster can Spark scale to? Many organizations run Spark on clusters of thousands of nodes. The largest cluster we know has 8000 of them. In terms of data size, Spark has been shown to work well up to petabytes.
I managed to fix it by setting the property spark.mesos.coarse to true. More info here.
Update: I've been playing around with Spark for a couple of hours. These settings helped me a little, but it seems it's nearly impossible to process ~10million lines of text on a single machine.
System.setProperty("spark.serializer", "spark.KryoSerializer"); // kryo is much faster
System.setProperty("spark.kryoserializer.buffer.mb", "256"); // I serialize bigger objects
System.setProperty("spark.mesos.coarse", "true"); // link provided
System.setProperty("spark.akka.frameSize", "500"); // workers should be able to send bigger messages
System.setProperty("spark.akka.askTimeout", "30"); // high CPU/IO load
Note: Increasing the frame size seems particularly helpful at preventing: org.apache.spark.SparkException: Error communicating with MapOutputTracker
in the newer spark version, should use:
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
according to http://spark.apache.org/docs/latest/tuning.html#data-serialization
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With