I wrote a class that gets a DataFrame, does some calculations on it and can export the results. The Dataframes are generated by a List of Keys. I know that i am doing this in a very unefficient way right now:
var l = List(34, 32, 132, 352) // Scala List
l.foreach{i =>
val data:DataFrame = DataContainer.getDataFrame(i) // get DataFrame
val x = new MyClass(data) // initialize MyClass with new Object
x.setSettings(...)
x.calcSomething()
x.saveResults() // writes the Results into another Dataframe that is saved to HDFS
}
I think the foreach on the Scala list is not parallel, so how can i avoid using foreach here? The calculation the DataFrames could happen in parallel, as results of the calculations are NOT input for the next DataFrame - how can i implement this?
Thank you so much!!
__edit:
what i tried to do:
val l = List(34, 32, 132, 352) // Scala List
var l_DF:List[DataFrame] = List()
l.foreach{ i =>
DataContainer.getDataFrame(i)::l //append DataFrame to List of Dataframes
}
val rdd:DataFrame = sc.parallelize(l)
rdd.foreach(data =>
val x = new MyClass(data)
)
but gives
Invalid tree; null:
null
edit 2: Okay, i don´t get how evrything works under the hood....
1) Everything works fine when i execute this in spark-shell
spark-shell –driver-memory 10g
//...
var l = List(34, 32, 132, 352) // Scala List
l.foreach{i =>
val data:DataFrame = AllData.where($"a" === i) // get DataFrame
val x = new MyClass(data) // initialize MyClass with new Object
x.calcSomething()
}
2) Error, when i start the same with
spark-shell --master yarn-client --num-executors 10 –driver-memory 10g
// same code as above
java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@7b600fed rejected from java.util.concurrent.ThreadPoolExecutor@1431127[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1263]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
3) when i try to parallelize it, i get a error, too
spark-shell --master yarn-client --num-executors 10 –driver-memory 10g
//...
var l = List(34, 32, 132, 352).par
// same code as above, just parallelized before calling foreach
// i can see the parallel execution by the console messages (my class gives some and they are printed out parallel now instead of sequentielly
scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown during a parallel computation: java.lang.IllegalStateException: SparkContext has been shutdown
org.apache.spark.SparkContext.runJob(SparkContext.scala:1816)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215)
org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
.
.
.
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext org.apache.spark.SparkContext.org$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:104)
org.apache.spark.SparkContext.broadcast(SparkContext.scala:1320)
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:104)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.makeBroadcastHashJoin(SparkStrategies.scala:92)
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.apply(SparkStrategies.scala:104)
There are actually more than 10 executors, but 4 nodes. I never configure the spark-context. It´s already given on startup.
You can use scala's parallel collections to achieve foreach
parallelism on the driver side.
val l = List(34, 32, 132, 352).par
l.foreach{i => // your code to be run in parallel for each i}
*However, a word of caution: is your cluster capable of running jobs parallely? You may submit the jobs to your spark cluster parallely but they may end up getting queued on the cluster and get executed sequentially.
You can use scala's Future and Spark Fair Scheduling, e.g.
import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
object YourApp extends App {
val sc = ... // SparkContext, be sure to set spark.scheduler.mode=FAIR
var pool = 0
// this is to have different pools per job, you can wrap it to limit the no. of pools
def poolId = {
pool = pool + 1
pool
}
def runner(i: Int) = Future {
sc.setLocalProperty("spark.scheduler.pool", poolId)
val data:DataFrame = DataContainer.getDataFrame(i) // get DataFrame
val x = new MyClass(data) // initialize MyClass with new Object
x.setSettings(...)
x.calcSomething()
x.saveResults()
}
val l = List(34, 32, 132, 352) // Scala List
val futures = l map(i => runner(i))
// now you need to wait all your futures to be completed
futures foreach(f => Await.ready(f, Duration.Inf))
}
With FairScheduler and different pools, each concurrent job will have a fair share of the spark cluster resources.
Some reference regarding scala's future here. You might need to add necessary callbacks on completion, success, and/or failures.
I did this using something like using List.par.foreach{object => print(object)}
.
I am using Zeppelin on Spark 2.3. I have a similar use case where I need to get the data day by day, and process it separately. This cannot be done using a whole month data because of some join conditions on the tables I'm using. Here is a sample of my code:
import java.time.LocalDate
import java.sql.Date
var start = LocalDate.of(2019, 1, 1)
val end = LocalDate.of(2019, 2, 1)
var list : List[LocalDate] = List()
var usersDf = spark.read.load("s3://production/users/")
usersDf.createOrReplaceTempView("usersDf")
while (start.isBefore(end)){
list = start :: list
start = start.plusDays(1)
}
list.par.foreach{ loopDate =>
//println(start)
var yesterday = loopDate.plusDays(-1)
var tomorrow = loopDate.plusDays(1)
var lastDay = yesterday.getDayOfMonth()
var lastMonth = yesterday.getMonthValue()
var lastYear = yesterday.getYear()
var day = loopDate.getDayOfMonth()
var month = loopDate.getMonthValue()
var year = loopDate.getYear()
var dateDay = loopDate
var condition: String = ""
if (month == lastMonth) {
condition = s"where year = $year and month = $month and day in ($day, $lastDay)"
} else {
condition = s"""where ((year = $year and month = $month and day = $day) or
(year = $lastYear and month = $lastMonth and day = $lastDay))
"""
}
//Get events in local timezone
var aggPbDf = spark.sql(s"""
with users as (
select * from users
where account_creation_date < '$tomorrow'
)
, cte as (
select e.* date(from_utc_timestamp(to_timestamp(concat(e.year,'-', e.month, '-', e.day, ' ', e.hour), 'yyyy-MM-dd HH'), coalesce(u.timezone_name, 'UTC'))) as local_date
from events.user_events e
left join users u
on u.account_id = e.account_id
$condition)
select * from cte
where local_date = '$dateDay'
"""
)
aggPbDf.write.mode("overwrite")
.format("parquet")
.save(s"s3://prod-bucket/events/local-timezone/date_day=$dateDay")
}
This will get the data for two days, process it, then write out only the desired output. Running this without par
will take about 15 minutes per day, but with par
it took 1 hour for the whole month. This will also depend on what your spark cluster can support and the size of your data.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With