I'm attempting to use the Python port of the Google phonenumbers
library to normalize 50 Million phone numbers. I'm reading into a SparkDataFrame from a Parquet file on S3 and then running operations on the dataframe. The following function, parsePhoneNumber
, is expressed as a UDF:
def isValidNumber(phoneNum):
try:
pn = phonenumbers.parse(phoneNum, "US")
except:
return False
else:
return phonenumbers.is_valid_number(pn) and phonenumbers.is_possible_number(pn)
def parsePhoneNumber(phoneNum):
if isValidNumber(phoneNum):
parsedNumber = phonenumbers.parse(phoneNum, "US")
formattedNumber = phonenumbers.format_number(parsedNumber, phonenumbers.PhoneNumberFormat.E164)
return (True, parsedNumber.country_code, formattedNumber, parsedNumber.national_number, parsedNumber.extension)
else:
return (False, None, None, None)
And below is a sample of how I use the UDF to derive new columns:
newDataFrame = oldDataFrame.withColumn("new_column", parsePhoneNumber_udf(oldDataFrame.phone)).select("id", "new_column".national_number)
Executing the UDF by running display(newDataFrame)
or newDataFrame.show(5)
or something similar only uses one executer in the cluster, so it doesn't appear that something in the UDF is causing it only run on one worker.
If I'm doing anything that would prevent this from running in parallel, can you provide some insight?
The execution environment is on a cloud cluster controlled by Databricks.
Edit: Below is the output of oldDataFrame.explain
== Parsed Logical Plan ==
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet
== Analyzed Logical Plan ==
id: string, person_id: string, phone: string, type: string, source_id: string, created_date: string, modified_date: string
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet
== Optimized Logical Plan ==
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet
== Physical Plan ==
*FileScan parquet [id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/mnt/person-data/parquet/phone], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,person_id:string,phone:string,type:string,source_id:string,created_date:strin...
UDF is an abbreviation of “user defined function” in Spark. Generally, all Spark-native functions applied on Spark DataFrame are vectorized, which takes advantage of Spark's parallel processing.
One important way to increase parallelism of spark processing is to increase the number of executors on the cluster. However, knowing how the data should be distributed, so that the cluster can process data efficiently is extremely important. The secret to achieve this is partitioning in Spark.
It is well known that the use of UDFs (User Defined Functions) in Apache Spark, and especially in using the Python API, can compromise our application performace. For this reason, at Damavis we try to avoid their use as much as possible infavour of using native functions or SQL .
Parallel Operations on Partitions RDD operations are executed in parallel on each partition. Tasks are executed on the Worker Nodes where the data is stored. Some operations preserve partitioning, such as map, flatMap, filter, distinct, and so on.
You are all good. Display
, with default arguments shows the first 1000 rows at most. Similarly newDataFrame.show(5)
shows only the first five rows.
At the same time execution plain (oldDataFrame.explain
) shows no shuffles so in both cases Spark will evaluate only the minimum number of partitions to get the required number of rows - for these values it is probably one partition.
If you want to be sure:
oldDataFrame.rdd.getNumPartitions()
is larger than one.df.foreach(lambda _: None)
or newDataFrame.foreach(lambda _: None)
.You should see more active executors.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With