Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does Spark achieve parallelism within one task on multi-core or hyper-threaded machines

I have been reading and trying to understand how does Spark framework use its cores in Standalone mode. According to Spark documentation, the parameter "spark.task.cpus"'s value is set to be 1 by default, which means number of cores to allocate for each task.

Question 1: For a multi-core machine (e.g., 4 cores in total, 8 hardware threads), when "spark.task.cpus = 4", will Spark use 4 cores (1 thread per core) or 2 cores with hyper-thread?

What will it happen if I set "spark.task.cpus = 16", more than the number of available hardware threads on this machine?

Question 2: How is this type of hardware parallelism achieved? I tried to look into the code but couldn't find anything that communicates with the hardware or JVM for core-level parallelism. For example, if the task is "filter" function, how is a single filter task spitted to multiple cores or threads?

Maybe I am missing something. Is this related to the Scala language?

like image 210
Nodame Avatar asked Apr 17 '16 01:04

Nodame


People also ask

How is parallelism achieved in spark?

Parallel Operations on PartitionsRDD operations are executed in parallel on each partition. Tasks are executed on the Worker Nodes where the data is stored. Some operations preserve partitioning, such as map, flatMap, filter, distinct, and so on.

Does spark use multithreading?

Spark is known for its parallel processing, which means a data frame or a resilient distributed dataset (RDD) is being distributed across the worker nodes to gain maximum performance while processing.

Does spark use multiple cores?

Spark scales well to tens of CPU cores per machine because it performs minimal sharing between threads. You should likely provision at least 8-16 cores per machine. Depending on the CPU cost of your workload, you may also need more: once data is in memory, most applications are either CPU- or network-bound.

How many threads does a core Spark have?

Most processors of today are multi-threaded (e.g. 1 Core = 2 Threads, 8 cores = 16 Threads) A Spark Task runs on a Slot. 1 Thread is capable of doing 1 Task at a time.


1 Answers

To answer your title question, Spark by itself does not give you parallelism gains within a task. The main purpose of the spark.task.cpus parameter is to allow for tasks of multithreaded nature. If you call an external multithreaded routine within each task, or you want to encapsulate the finest level of parallelism yourself on the task level, you may want to set spark.task.cpus to more than 1.

  • Setting this parameter to more than 1 is not something you would do often, though.

    • The scheduler will not launch a task if the number of available cores is less than the cores required by the task, so if your executor has 8 cores, and you've set spark.task.cpus to 3, only 2 tasks will get launched.
    • If your task does not consume the full capacity of the cores all the time, you may find that using spark.task.cpus=1 and experiencing some contention within the task still gives you more performance.
    • Overhead from things like GC or I/O probably shouldn't be included in the spark.task.cpus setting, because it'd probably be a much more static cost, that doesn't scale linearly with your task count.

Question 1: For a multi-core machine (e.g., 4 cores in total, 8 hardware threads), when "spark.task.cpus = 4", will Spark use 4 cores (1 thread per core) or 2 cores with hyper-thread?

The JVM will almost always rely on the OS to provide it with info and mechanisms to work with CPUs, and AFAIK Spark doesn't do anything special here. If Runtime.getRuntime().availableProcessors() or ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors() return 4 for your dual-core HT-enabled Intel® processor, Spark will also see 4 cores.

Question 2: How is this type of hardware parallelism achieved? I tried to look into the code but couldn't find anything that communicates with the hardware or JVM for core-level parallelism. For example, if the task is "filter" function, how is a single filter task spitted to multiple cores or threads?

Like mentioned above, Spark won't automatically parallelize a task according to the spark.task.cpus parameter. Spark is mostly a data parallelism engine and its parallelism is achieved mostly through representing your data as RDDs.

like image 110
Dimitar Dimitrov Avatar answered Dec 05 '22 14:12

Dimitar Dimitrov