Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark : Setting Executors/Cores and Memory Local Machine

So I looked at a bunch of posts on Pyspark, Jupyter and setting memory/cores/executors (and the associated memory).

But I appear to be stuck -

Question 1: I dont see my machine utilizing either the cores or the memory. Why? Can I do some adjustments to the excutors/cores/memory to optimize speed of reading the file? Question 2: Also is there any way for me to see a progress bar showing how much of the file ahs been imported (spark-monitor doesnt seem to do it).

I am importing a 33.5gb file into pyspark.

Machine has 112 gb or RAM 8 Cores/16 virtual cores.

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Summaries") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '4g'), 

('spark.app.name', 'Spark Updated Conf'), 
('spark.driver.cores', '4'), ('spark.executor.cores', '16'), 
('spark.driver.memory','90g')])
spark.sparkContext.stop()
spark = SparkSession.builder.config(conf=conf).getOrCreate()
df = spark.read.json("../Data/inasnelylargefile.json.gz")

I assume that the pyspark is doing its magic even while reading a file (so I should see heavy core/memory utilization). But I am not seeing it.Help!

Update: Tested with a smaller zip file (89 MB)

Pyspark take 72 seconds Pandas takes 10.6 seconds Code used :

start = time.time()
df = spark.read.json("../Data/small.json.gz")
end = time.time()
print(end - start)

start = time.time()
df = pa.read_json('../Data/small.json.gz',compression='gzip', lines = True)
end = time.time()
print(end - start)
like image 726
pythOnometrist Avatar asked Aug 13 '20 02:08

pythOnometrist


People also ask

How do I set executor cores in Spark?

Every Spark executor in an application has the same fixed number of cores and same fixed heap size. The number of cores can be specified with the --executor-cores flag when invoking spark-submit, spark-shell, and pyspark from the command line, or by setting the spark. executor. cores property in the spark-defaults.

How many cores does an executor have in Spark?

The consensus in most Spark tuning guides is that 5 cores per executor is the optimum number of cores in terms of parallel processing.

How can we define the executor memory for a Spark Program?

Each cluster worker node contains executors. An executor is a process that is launched for a Spark application on a worker node. Each executor memory is the sum of yarn overhead memory and JVM Heap memory.


3 Answers

Although the answer to your question lies only in one of the following issues, let me rewrite your example to explain what is happening.

Setting your configuration

First, you don't need to start and stop a context to set your config. Since spark 2.0 you can create the spark session and then set the config options.

from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("yourAwesomeApp").getOrCreate())
spark.conf.set("spark.executor.memory", "40g")
spark.conf.set("spark.executor.cores", "2")

Reading your data

Spark will lazily evaluate the DAG. The time you are measuring in your snipped is not the load of the data into the data frame, but just the schema inference for the JSON file. Schema inference is expensive, you should try to avoid it by setting the schema of your data. You will see a big difference in performance between:

df = spark.read.json("../data/a_very_large_json.json.gz")

and

from pyspark.sql.types import (
    StructType, 
    StringType, 
    StructField,
)
json_schema = schema = StructType([
    StructField('data', StructType([
        StructField("field1", StringType(), nullable=False),
        StructField("field2", StringType(), nullable=False),
        StructField("field3", StringType(), nullable=True),
        StructField("field4", StringType(), nullable=True),
        StructField("field5", LongType(), nullable=False),
    ])),
])
df = spark.read.json("../data/a_very_large_json.json.gz", schema=json_schema)

If you supply the schema this instruction should be almost instantly. As another user has already mentioned, to execute the task you need to have an activity, such as show, head, collect, persist, etc.

df.show()

You can set up the number of executor instances and cores on the configuration, but the actual use of those instances also depends on your input data and the transformations/actions you perform. By your description, I assume you are working on standalone mode, so having one executor instance will be the default (using all the cores), and you should set the executor memory to use the one you have available. As far as I remember, when you work on a standalone mode the spark.executor.instances is ignored and the actual number of executors is based on the number of cores available and the spark.executor.cores

Comparison with pandas

If you are working with only one node, loading the data into a data frame, the comparison between spark and pandas is unfair. Spark will always have a higher overhead. Sparks will shine when you have datasets that don't fit on one machine's memory and you have multiple nodes to perform the computation work. If you are comfortable with pandas, I think you can be interested in koalas from Databricks.

Recommendation

I prefer to set up the execution details outside the application (e.g. using the spark-submit parameters). On rare occasions, to improve the performance, you will need to set some of them into the code, but with every new version of Spark, this is less frequent. If you can achieve this, your application will be more future-proof, and easy to scale.

like image 105
CronosNull Avatar answered Oct 26 '22 23:10

CronosNull


spark.sparkContext.stop()
spark = SparkSession.builder.config(conf=conf).getOrCreate()
df = spark.read.json("../Data/inasnelylargefile.json.gz")

Add this:

df.show() 
##OR
df.persist()

The comparison you are doing is not apples to apples, spark performs lazy evaluation, meaning if you don't call an action over your operation, it will do nothing but just compile and keep the DAG ready for you.

In Spark, there are two concepts,

  1. Transformation: Evaluated lazily
  2. Actions: (like collect(), take(), show(),persist()) evaluated instantly.

In your case, read() is just a transformation, adding an action should trigger the computation.

More about actions vs transformation: https://training.databricks.com/visualapi.pdf

like image 35
Karan Sharma Avatar answered Oct 27 '22 00:10

Karan Sharma


The reason your Spark read is slower then pandas is because the gz file is not splittable, therefore Spark has to read the whole file with a single task. However, when reading an uncompressed file, or a file compressed with a splittable compression format like bzip2, the Spark will deploy x number of tasks in parallel (up to the number of cores available in your cluster) to read the file. Try unpacking the file before you pass it to Spark.

like image 39
jayrythium Avatar answered Oct 26 '22 22:10

jayrythium