I want to understand a basic thing in spark streaming. I have 50 Kafka topic partitions and 5 numbers of executors, I am using DirectAPI so no. of RDD partitions will be 50. How this partition will be processed on 5 executors? Will spark process 1 partition at a time on each executors or if the executor has enough memory and cores it will process more than 1 partition in parallel on each executor.
Will spark process 1 partition at a time on each executors or if the executor has enough memory and cores it will process more than 1 partition in parallel on each executor.
Spark will process each partition depending on the total amount of cores available to the job you're running.
Let's say your streaming job has 10 executors, each one with 2 cores. This means that you'll be able to process 10 x 2 = 20 partitions concurrently, assuming spark.task.cpus
is set to 1.
If you really want the details, look inside Spark Standalone requests resources from CoarseGrainedSchedulerBackend
, you can look at it's makeOffers
:
private def makeOffers() {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toIndexedSeq
launchTasks(scheduler.resourceOffers(workOffers))
}
Key here is executorDataMap
, which holds a mapping from executor id to an ExecutorData
, which tells how much cores each such executor in the system is utilizing, and according to that and the preferred locality of the partition, makes an educated guess on which executor this task should run.
Here is an example from a live Spark Streaming app consuming from Kafka:
We have 5 partitions with 3 executors running, where each executor has more than 2 cores which enables the streaming to process each partition concurrently.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With