Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is this simple Spark program not utlizing multiple cores?

So, I'm running this simple program on a 16 core multicore system. I run it by issuing the following.

spark-submit --master local[*] pi.py

And the code of that program is the following.

#"""pi.py"""
from pyspark import SparkContext
import random

N = 12500000

def sample(p):
    x, y = random.random(), random.random()
    return 1 if x*x + y*y < 1 else 0

sc = SparkContext("local", "Test App")
count = sc.parallelize(xrange(0, N)).map(sample).reduce(lambda a, b: a + b)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)

When I use top to see CPU consumption, only 1 core is being utilized. Why is it so? Seconldy, spark documentation says that the default parallelism is contained in property spark.default.parallelism. How can I read this property from within my python program?

like image 290
MetallicPriest Avatar asked Nov 09 '14 13:11

MetallicPriest


People also ask

How do I force a program to use multiple cores?

Core Settings In Windows 10Type 'msconfig' into the Windows Search Box and hit Enter. Select the Boot tab and then Advanced options. Check the box next to Number of processors and select the number of cores you want to use (probably 1, if you are having compatibility issues) from the menu. Select OK and then Apply.

Does spark use multiple cores?

Spark scales well to tens of CPU cores per machine because it performs minimal sharing between threads. You should likely provision at least 8-16 cores per machine. Depending on the CPU cost of your workload, you may also need more: once data is in memory, most applications are either CPU- or network-bound.

Can a single process run on multiple cores?

Yes, a single process can run multiple threads on different cores. Caching is specific to the hardware. Many modern Intel processors have three layers of caching, where the last level cache is shared across cores.

What programs actually use multiple cores?

The following are examples of CPU-hungry applications that can take advantage of multiple cores: Photo and video editing apps— Adobe Photoshop, Adobe Premier, iMovie. 3D modeling and rendering programs — AutoCAD, Solidworks. Graphics-intensive games — Overwatch, Star Wars Battlefront.


2 Answers

As none of the above really worked for me (maybe because I didn't really understand them), here is my two cents.

I was starting my job with spark-submit program.py and inside the file I had sc = SparkContext("local", "Test"). I tried to verify the number of cores spark sees with sc.defaultParallelism. It turned out that it was 1. When I changed the context initialization to sc = SparkContext("local[*]", "Test") it became 16 (the number of cores of my system) and my program was using all the cores.

I am quite new to spark, but my understanding is that local by default indicates the use of one core and as it is set inside the program, it would overwrite the other settings (for sure in my case it overwrites those from configuration files and environment variables).

like image 66
Ivaylo Petrov Avatar answered Sep 20 '22 15:09

Ivaylo Petrov


Probably because the call to sc.parallelize puts all the data into one single partition. You can specify the number of partitions as 2nd argument to parallelize:

part = 16
count = sc.parallelize(xrange(N), part).map(sample).reduce(lambda a, b: a + b)

Note that this would still generate the 12 millions points with one CPU in the driver and then only spread them out to 16 partitions to perform the reduce step.

A better approach would try to do most of the work after the partitioning: for example the following generates only a tiny array on the driver and then lets each remote task generate the actual random numbers and subsequent PI approximation:

part = 16
count = ( sc.parallelize([0] * part, part)
           .flatMap(lambda blah: [sample(p) for p in xrange( N/part)])
           .reduce(lambda a, b: a + b)
       )

Finally, (because the more lazy we are the better), spark mllib actually comes already with a random data generation which is nicely parallelized, have a look here: http://spark.apache.org/docs/1.1.0/mllib-statistics.html#random-data-generation. So maybe the following is close to what you try to do (not tested => probably not working, but should hopefully be close)

count = ( RandomRDDs.uniformRDD(sc, N, part)
        .zip(RandomRDDs.uniformRDD(sc, N, part))
        .filter (lambda (x, y): x*x + y*y < 1)
        .count()
        )
like image 29
Svend Avatar answered Sep 21 '22 15:09

Svend