I have a weka model stored in S3 which is of size around 400MB. Now, I have some set of record on which I want to run the model and perform prediction.
For performing prediction, What I have tried is,
Download and load the model on driver as a static object , broadcast it to all executors. Perform a map operation on prediction RDD. ----> Not working, as in Weka for performing prediction, model object needs to be modified and broadcast require a read-only copy.
Download and load the model on driver as a static object and send it to executor in each map operation. -----> Working (Not efficient, as in each map operation, i am passing 400MB object)
Download the model on driver and load it on each executor and cache it there. (Don't know how to do that)
Does someone have any idea how can I load the model on each executor once and cache it so that for other records I don't load it again?
Leave 1 core per node for Hadoop/Yarn daemons => Num cores available per node = 16-1 = 15. So, Total available of cores in cluster = 15 x 10 = 150. Number of available executors = (total cores/num-cores-per-executor) = 150/5 = 30. Leaving 1 executor for ApplicationManager => --num-executors = 29.
Executors are worker nodes' processes in charge of running individual tasks in a given Spark job. They are launched at the beginning of a Spark application and typically run for the entire lifetime of an application. Once they have run the task they send the results to the driver.
Executor resides in the Worker node. Executors are launched at the start of a Spark Application in coordination with the Cluster Manager. They are dynamically launched and removed by the Driver as per required. To run an individual Task and return the result to the Driver.
Executors in Spark are the worker nodes that help in running individual tasks by being in charge of a given spark job. These are launched at the beginning of Spark applications, and as soon as the task is run, results are immediately sent to the driver.
You have two options:
object WekaModel { lazy val data = { // initialize data here. This will only happen once per JVM process } }
Then, you can use the lazy val in your map
function. The lazy val
ensures that each worker JVM initializes their own instance of the data. No serialization or broadcasts will be performed for data
.
elementsRDD.map { element => // use WekaModel.data here }
Advantages
Disadvantages
mapPartition
(or foreachPartition
) method on the RDD instead of just map
.This allows you to initialize whatever you need for the entire partition.
elementsRDD.mapPartition { elements => val model = new WekaModel() elements.map { element => // use model and element. there is a single instance of model per partition. } }
Advantages:
Disadvantages
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With