I'm still fairly new to Spark but I have been able to create the Spark App I need to be able to reprocess data from our SQL Server using JDBC drivers ( we are removing expensive SPs ), the app loads a few tables from Sql Server via JDBC into dataframes, then I do a few joins, a group, and a filter finally reinserting some data back via JDBC the results to a different table. All this executes just fine at Spark EMR in Amazon Web Services in a m3.xlarge with 2 cores in around a minute.
My question is the following: 1. right now I have 1 master and 2 cores on the cluster, but every time I launch a new step, It seems from what I can see from the history server, only 1 executor is being used as I can see 2 executors listed, driver with no usage at all, an an executor with id 1 processing around 1410 tasks. And I'm completely unsure on how to proceed.
Also this is specific to AWS but I didn't want to post 2 questions as they are somehow related, is there any way I can run 2 steps at the same time? meaning to be able to have 2 spark-submits of this process running at the same time, as we run this process many many times a day ( it processes client data ). I know I can launch a new cluster with the step, but i want to be able to do the processing fast and just launching a new cluster takes too long. Thanks!!!
For your first question:
I am not sure if this is the case, but something similar happened to us and maybe it can help.
If you are reading from the JDBC source using sqlContext.read.format("jdbc").load()
(or similar), by default the resulting dataframe is not partitioned. So, if it's the case for you, applying transformations in the resulting dataframe without partitioning it first would result in only one executor being able to process it. If it's not your case, the following solution will probably not solve your problem.
So, our solution was to create a numeric column with values values from 1 to 32 (our desired number of partitions) in the data and use it as partitioning column by setting the partitioning options of the jdbc reader (please check this link):
val connectionOptions = Map[String, String] (... <connection options> ...)
val options = connectionOptions ++ Map[String, String] (
"partitionColumn" -> "column name",
"lowerBound" -> "1",
"upperBound" -> "32",
"numPartitions" -> "32"
)
val df = sqlContext.read.format("jdbc").options(options).load()
So, with this approach, not only was the reading task able to be processed in parallel (really improving the performance and avoiding OOM errors), but the resulting dataframe was partitioned and processed in parallel for all subsequent transformations.
I hope that helps.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With