We are running Spark Java in local mode on a single AWS EC2 instance using
"local[*]"
However, profiling using New Relic tools and a simple 'top' show that only one CPU core of our 16 core machine is ever in use for three different Java spark jobs we've written (we've also tried different AWS instances but only one core is ever used).
Runtime.getRuntime().availableProcessors()
reports 16 processors and
sparkContext.defaultParallelism()
reports 16 as well.
I've looked at various Stackoverflow local mode issues but none seem to have resolved the issue.
Any advice much appreciated.
Thanks
EDIT: Process
1) Use sqlContext to read gzipped CSV file 1 using com.databricks.spark.csv from disc (S3) into DataFrame DF1.
2) Use sqlContext to read gzipped CSV file 2 using com.databricks.spark.csv from disc (S3) into DataFrame DF2.
3) Use DF1.toJavaRDD().mapToPair(new mapping function that returns a Tuple>) RDD1
4) Use DF2.toJavaRDD().mapToPair(new mapping function that returns a Tuple>) RDD2
5) Call union on the RDDs
6) Call reduceByKey() on the unioned RDDs to "merge by key" so have a Tuple>) with only one instance of a particular key (as the same key appears in both RDD1 and RDD2).
7) Call .values().map(new mapping Function which iterates over all items in the provided List and merges them as required to return a List of the same or smaller length
8) Call .flatMap() to get an RDD
9) Use sqlContext to create a DataFrame from the flat map of type DomainClass
10) Use DF.coalease(1).write() to write the DF as gzipped CSV to S3.
Local Mode is also known as Spark in-process is the default mode of spark. It does not require any resource manager. It runs everything on the same machine. Because of local mode, we are able to simply download spark and run without having to install any resource manager.
It's generally much easier to test your code locally (on a smaller data set, one assumes) before uploading to the Cluster. Fortunately, Spark makes that easy.
The most common way to launch spark applications on the cluster is to use the shell command spark-submit. When using spark-submit shell command the spark application need not be configured particularly for each cluster as the spark-submit shell script uses the cluster managers through a single interface.
I think your problem is that your CSV files are gzipped. When Spark reads files, it loads them in parallel, but it can only do this if the file codec is splittable*. Plain (non-gzipped) text and parquet are splittable, as well as the bgzip
codec used in genomics (my field). Your entire files are ending up in one partition each.
Try decompressing the csv.gz files and running this again. I think you'll see much better results!
Edit: I replicated this behavior on my machine. Using sc.textFile
on a 3G gzipped text file produced 1 partition.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With