Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallelize / avoid foreach loop in spark

I wrote a class that gets a DataFrame, does some calculations on it and can export the results. The Dataframes are generated by a List of Keys. I know that i am doing this in a very unefficient way right now:

var l = List(34, 32, 132, 352)      // Scala List

l.foreach{i => 
    val data:DataFrame = DataContainer.getDataFrame(i) // get DataFrame
    val x = new MyClass(data)                     // initialize MyClass with new Object
    x.setSettings(...)
    x.calcSomething()
    x.saveResults()                               // writes the Results into another Dataframe that is saved to HDFS
}

I think the foreach on the Scala list is not parallel, so how can i avoid using foreach here? The calculation the DataFrames could happen in parallel, as results of the calculations are NOT input for the next DataFrame - how can i implement this?

Thank you so much!!

__edit:

what i tried to do:

val l = List(34, 32, 132, 352)      // Scala List
var l_DF:List[DataFrame] = List()
l.foreach{ i =>
    DataContainer.getDataFrame(i)::l        //append DataFrame to List of Dataframes
}

val rdd:DataFrame = sc.parallelize(l)
rdd.foreach(data =>
    val x = new MyClass(data)
)

but gives

Invalid tree; null:
null

edit 2: Okay, i don´t get how evrything works under the hood....

1) Everything works fine when i execute this in spark-shell

spark-shell –driver-memory 10g       
//...
var l = List(34, 32, 132, 352)      // Scala List

l.foreach{i => 
    val data:DataFrame = AllData.where($"a" === i) // get DataFrame
    val x = new MyClass(data)                     // initialize MyClass     with new Object
    x.calcSomething()
}

2) Error, when i start the same with

spark-shell --master yarn-client --num-executors 10 –driver-memory 10g  
// same code as above
java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@7b600fed rejected from java.util.concurrent.ThreadPoolExecutor@1431127[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1263]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at scala.concurrent.Promise$class.complete(Promise.scala:55)
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
    at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at scala.concurrent.Promise$class.complete(Promise.scala:55)
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)

3) when i try to parallelize it, i get a error, too

spark-shell --master yarn-client --num-executors 10 –driver-memory 10g
//...
var l = List(34, 32, 132, 352).par
// same code as above, just parallelized before calling foreach
// i can see the parallel execution by the console messages (my class gives some and they are printed out parallel now instead of sequentielly

scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown during a parallel computation: java.lang.IllegalStateException: SparkContext has been shutdown
org.apache.spark.SparkContext.runJob(SparkContext.scala:1816)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215)
    org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
.
.
.

java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext                  org.apache.spark.SparkContext.org$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:104)
 org.apache.spark.SparkContext.broadcast(SparkContext.scala:1320)
   org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:104)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.makeBroadcastHashJoin(SparkStrategies.scala:92)
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.apply(SparkStrategies.scala:104)

There are actually more than 10 executors, but 4 nodes. I never configure the spark-context. It´s already given on startup.

like image 882
johntechendso Avatar asked Jun 28 '16 07:06

johntechendso


3 Answers

You can use scala's parallel collections to achieve foreach parallelism on the driver side.

val l = List(34, 32, 132, 352).par
l.foreach{i => // your code to be run in parallel for each i}

*However, a word of caution: is your cluster capable of running jobs parallely? You may submit the jobs to your spark cluster parallely but they may end up getting queued on the cluster and get executed sequentially.

like image 70
Sachin Tyagi Avatar answered Oct 19 '22 19:10

Sachin Tyagi


You can use scala's Future and Spark Fair Scheduling, e.g.

import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global

object YourApp extends App { 
  val sc = ... // SparkContext, be sure to set spark.scheduler.mode=FAIR
  var pool = 0
  // this is to have different pools per job, you can wrap it to limit the no. of pools
  def poolId = {
    pool = pool + 1
    pool
  }
  def runner(i: Int) = Future {
    sc.setLocalProperty("spark.scheduler.pool", poolId)
    val data:DataFrame = DataContainer.getDataFrame(i) // get DataFrame
    val x = new MyClass(data)                     // initialize MyClass with new Object
    x.setSettings(...)
    x.calcSomething()
    x.saveResults()
  }

  val l = List(34, 32, 132, 352)      // Scala List
  val futures = l map(i => runner(i))

  // now you need to wait all your futures to be completed
  futures foreach(f => Await.ready(f, Duration.Inf))

}

With FairScheduler and different pools, each concurrent job will have a fair share of the spark cluster resources.

Some reference regarding scala's future here. You might need to add necessary callbacks on completion, success, and/or failures.

like image 40
dansuzuki Avatar answered Oct 19 '22 20:10

dansuzuki


I did this using something like using List.par.foreach{object => print(object)}. I am using Zeppelin on Spark 2.3. I have a similar use case where I need to get the data day by day, and process it separately. This cannot be done using a whole month data because of some join conditions on the tables I'm using. Here is a sample of my code:

import java.time.LocalDate
import java.sql.Date

var start =  LocalDate.of(2019, 1, 1)
val end   =  LocalDate.of(2019, 2, 1)
var list : List[LocalDate] = List()

var usersDf = spark.read.load("s3://production/users/")
usersDf.createOrReplaceTempView("usersDf")

while (start.isBefore(end)){
    list = start :: list
    start = start.plusDays(1)
}

list.par.foreach{ loopDate =>
    //println(start)
    var yesterday = loopDate.plusDays(-1)
    var tomorrow = loopDate.plusDays(1)
    var lastDay = yesterday.getDayOfMonth()
    var lastMonth = yesterday.getMonthValue()
    var lastYear = yesterday.getYear()

    var day = loopDate.getDayOfMonth()
    var month = loopDate.getMonthValue()
    var year = loopDate.getYear()
    var dateDay = loopDate

    var condition: String = ""
    if (month == lastMonth) {
        condition = s"where year = $year and month = $month and day in ($day, $lastDay)"
    } else {
        condition = s"""where ((year = $year and month = $month and day = $day) or
        (year = $lastYear and month = $lastMonth and day = $lastDay)) 
        """
    }

    //Get events in local timezone
    var aggPbDf = spark.sql(s"""
            with users as (
            select * from users
            where account_creation_date < '$tomorrow'
        )
        , cte as (
            select e.* date(from_utc_timestamp(to_timestamp(concat(e.year,'-', e.month, '-', e.day, ' ', e.hour), 'yyyy-MM-dd HH'), coalesce(u.timezone_name, 'UTC'))) as local_date
            from events.user_events e
            left join users u
            on u.account_id = e.account_id
            $condition)
        select * from cte
        where local_date = '$dateDay'
    """
    )
    aggPbDf.write.mode("overwrite")
        .format("parquet")
        .save(s"s3://prod-bucket/events/local-timezone/date_day=$dateDay")
}

This will get the data for two days, process it, then write out only the desired output. Running this without par will take about 15 minutes per day, but with par it took 1 hour for the whole month. This will also depend on what your spark cluster can support and the size of your data.

like image 2
Med Zamrik Avatar answered Oct 19 '22 19:10

Med Zamrik