So I looked at a bunch of posts on Pyspark, Jupyter and setting memory/cores/executors (and the associated memory).
But I appear to be stuck -
Question 1: I dont see my machine utilizing either the cores or the memory. Why? Can I do some adjustments to the excutors/cores/memory to optimize speed of reading the file? Question 2: Also is there any way for me to see a progress bar showing how much of the file ahs been imported (spark-monitor doesnt seem to do it).
I am importing a 33.5gb file into pyspark.
Machine has 112 gb or RAM 8 Cores/16 virtual cores.
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Summaries") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '4g'),
('spark.app.name', 'Spark Updated Conf'),
('spark.driver.cores', '4'), ('spark.executor.cores', '16'),
('spark.driver.memory','90g')])
spark.sparkContext.stop()
spark = SparkSession.builder.config(conf=conf).getOrCreate()
df = spark.read.json("../Data/inasnelylargefile.json.gz")
I assume that the pyspark is doing its magic even while reading a file (so I should see heavy core/memory utilization). But I am not seeing it.Help!
Update: Tested with a smaller zip file (89 MB)
Pyspark take 72 seconds Pandas takes 10.6 seconds Code used :
start = time.time()
df = spark.read.json("../Data/small.json.gz")
end = time.time()
print(end - start)
start = time.time()
df = pa.read_json('../Data/small.json.gz',compression='gzip', lines = True)
end = time.time()
print(end - start)
Every Spark executor in an application has the same fixed number of cores and same fixed heap size. The number of cores can be specified with the --executor-cores flag when invoking spark-submit, spark-shell, and pyspark from the command line, or by setting the spark. executor. cores property in the spark-defaults.
The consensus in most Spark tuning guides is that 5 cores per executor is the optimum number of cores in terms of parallel processing.
Each cluster worker node contains executors. An executor is a process that is launched for a Spark application on a worker node. Each executor memory is the sum of yarn overhead memory and JVM Heap memory.
Although the answer to your question lies only in one of the following issues, let me rewrite your example to explain what is happening.
First, you don't need to start and stop a context to set your config. Since spark 2.0 you can create the spark session and then set the config options.
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("yourAwesomeApp").getOrCreate())
spark.conf.set("spark.executor.memory", "40g")
spark.conf.set("spark.executor.cores", "2")
Spark will lazily evaluate the DAG. The time you are measuring in your snipped is not the load of the data into the data frame, but just the schema inference for the JSON file. Schema inference is expensive, you should try to avoid it by setting the schema of your data. You will see a big difference in performance between:
df = spark.read.json("../data/a_very_large_json.json.gz")
and
from pyspark.sql.types import (
StructType,
StringType,
StructField,
)
json_schema = schema = StructType([
StructField('data', StructType([
StructField("field1", StringType(), nullable=False),
StructField("field2", StringType(), nullable=False),
StructField("field3", StringType(), nullable=True),
StructField("field4", StringType(), nullable=True),
StructField("field5", LongType(), nullable=False),
])),
])
df = spark.read.json("../data/a_very_large_json.json.gz", schema=json_schema)
If you supply the schema this instruction should be almost instantly. As another user has already mentioned, to execute the task you need to have an activity, such as show, head, collect, persist, etc.
df.show()
You can set up the number of executor instances and cores on the configuration, but the actual use of those instances also depends on your input data and the transformations/actions you perform. By your description, I assume you are working on standalone mode, so having one executor instance will be the default (using all the cores), and you should set the executor memory to use the one you have available. As far as I remember, when you work on a standalone mode the spark.executor.instances
is ignored and the actual number of executors is based on the number of cores available and the spark.executor.cores
If you are working with only one node, loading the data into a data frame, the comparison between spark and pandas is unfair. Spark will always have a higher overhead. Sparks will shine when you have datasets that don't fit on one machine's memory and you have multiple nodes to perform the computation work. If you are comfortable with pandas, I think you can be interested in koalas from Databricks.
I prefer to set up the execution details outside the application (e.g. using the spark-submit parameters). On rare occasions, to improve the performance, you will need to set some of them into the code, but with every new version of Spark, this is less frequent. If you can achieve this, your application will be more future-proof, and easy to scale.
spark.sparkContext.stop()
spark = SparkSession.builder.config(conf=conf).getOrCreate()
df = spark.read.json("../Data/inasnelylargefile.json.gz")
Add this:
df.show()
##OR
df.persist()
The comparison you are doing is not apples to apples, spark performs lazy evaluation, meaning if you don't call an action over your operation, it will do nothing but just compile and keep the DAG ready for you.
In Spark, there are two concepts,
In your case, read() is just a transformation, adding an action should trigger the computation.
More about actions vs transformation: https://training.databricks.com/visualapi.pdf
The reason your Spark read is slower then pandas is because the gz file is not splittable, therefore Spark has to read the whole file with a single task. However, when reading an uncompressed file, or a file compressed with a splittable compression format like bzip2, the Spark will deploy x number of tasks in parallel (up to the number of cores available in your cluster) to read the file. Try unpacking the file before you pass it to Spark.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With