I want to understand what is happening under the hood when I run the following script named t1.py with python3 t1.py. Specifically, I have the following questions:
#!/home/python3/venv/bin/python3
#this file is named t1.py
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import DecimalType, IntegerType
import pyspark.sql.functions as F
from operator import add
import pandas as pd
from datetime import datetime
len = int(100000000/1)
print("len=", len)
spark = SparkSession.builder.appName('ai_project').getOrCreate()
start = datetime.now()
t=spark.sparkContext.parallelize(range(len))
a = t.reduce(add)
print(a)
end= datetime.now()
print("end for spark rdd sum:", end, end-start)
In PySpark, Python and JVM codes live in separate OS processes. PySpark uses Py4J, which is a framework that facilitates interoperation between the two languages, to exchange data between the Python and the JVM processes.
When you launch a PySpark job, it starts as a Python process, which then spawns a JVM instance and runs some PySpark specific code in it. It then instantiates a Spark session in that JVM, which becomes the driver program that Spark sees. That driver program connects to the Spark master or spawns an in-proc one, depending on how the session is configured.
When you create RDDs or Dataframes, those are stored in the memory of the Spark cluster just as RDDs and Dataframes created by Scala or Java applications. Transformations and actions on them work just as they do in JVM, with one notable difference: anything, which involves passing the data through Python code, runs outside the JVM. So, if you create a Dataframe, and do something like:
df.select("foo", "bar").where(df["foo"] > 100).count()
this runs entirely in the JVM as there is no Python code that the data must pass through. On the other side, if you do:
a = t.reduce(add)
since the add operator is a Python one, the RDD gets serialised, then sent to one or more Python processes where the reduction is performed, then the result is serialised again and returned to the JVM, and finally transferred over to the Python driver process for the final reduction.
The way this works (which coves your Q1) is like this:
add operator) and pickles it together with some additional dataThe JVM executors use network sockets to talk to the Python subprocesses they spawn and the special PySpark scripts they launch run a loop whose task is to sit there and expect serialised data and bytecode to run.
Regarding Q3, the JVM executors transfer whole RDD partitions to the Python subprocess, not single items. You should strive to use Pandas UDFs since those can be vectorised.
If you are interested in the details, start with the source code of python/pyspark/rdd.py and take a look at the RDD class.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With