Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark UDF not running in parallel

I'm attempting to use the Python port of the Google phonenumbers library to normalize 50 Million phone numbers. I'm reading into a SparkDataFrame from a Parquet file on S3 and then running operations on the dataframe. The following function, parsePhoneNumber, is expressed as a UDF:

def isValidNumber(phoneNum):
    try:
        pn = phonenumbers.parse(phoneNum, "US")
    except:
        return False
    else:
        return phonenumbers.is_valid_number(pn) and phonenumbers.is_possible_number(pn)

def parsePhoneNumber(phoneNum):
    if isValidNumber(phoneNum):
        parsedNumber = phonenumbers.parse(phoneNum, "US")
        formattedNumber = phonenumbers.format_number(parsedNumber, phonenumbers.PhoneNumberFormat.E164)

        return (True, parsedNumber.country_code, formattedNumber, parsedNumber.national_number, parsedNumber.extension)
    else:
        return (False, None, None, None)

And below is a sample of how I use the UDF to derive new columns:

newDataFrame = oldDataFrame.withColumn("new_column", parsePhoneNumber_udf(oldDataFrame.phone)).select("id", "new_column".national_number)

Executing the UDF by running display(newDataFrame) or newDataFrame.show(5) or something similar only uses one executer in the cluster, so it doesn't appear that something in the UDF is causing it only run on one worker.

If I'm doing anything that would prevent this from running in parallel, can you provide some insight?

The execution environment is on a cloud cluster controlled by Databricks.

Edit: Below is the output of oldDataFrame.explain

== Parsed Logical Plan ==
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet

== Analyzed Logical Plan ==
id: string, person_id: string, phone: string, type: string, source_id: string, created_date: string, modified_date: string
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet

== Optimized Logical Plan ==
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet

== Physical Plan ==
*FileScan parquet [id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/mnt/person-data/parquet/phone], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,person_id:string,phone:string,type:string,source_id:string,created_date:strin...
like image 433
Sean Lindo Avatar asked Jan 23 '18 16:01

Sean Lindo


People also ask

Does Spark UDF run in parallel?

UDF is an abbreviation of “user defined function” in Spark. Generally, all Spark-native functions applied on Spark DataFrame are vectorized, which takes advantage of Spark's parallel processing.

How do we achieve parallelism in Spark?

One important way to increase parallelism of spark processing is to increase the number of executors on the cluster. However, knowing how the data should be distributed, so that the cluster can process data efficiently is extremely important. The secret to achieve this is partitioning in Spark.

Why UDF are not recommended in Spark?

It is well known that the use of UDFs (User Defined Functions) in Apache Spark, and especially in using the Python API, can compromise our application performace. For this reason, at Damavis we try to avoid their use as much as possible infavour of using native functions or SQL .

What does it mean to perform a parallel operation on an RDD?

Parallel Operations on Partitions RDD operations are executed in parallel on each partition. Tasks are executed on the Worker Nodes where the data is stored. Some operations preserve partitioning, such as map, flatMap, filter, distinct, and so on.


1 Answers

You are all good. Display, with default arguments shows the first 1000 rows at most. Similarly newDataFrame.show(5) shows only the first five rows.

At the same time execution plain (oldDataFrame.explain) shows no shuffles so in both cases Spark will evaluate only the minimum number of partitions to get the required number of rows - for these values it is probably one partition.

If you want to be sure:

  • Check if oldDataFrame.rdd.getNumPartitions() is larger than one.
  • If it is, force execution of all partitions with df.foreach(lambda _: None) or newDataFrame.foreach(lambda _: None).

You should see more active executors.

like image 101
Alper t. Turker Avatar answered Sep 19 '22 14:09

Alper t. Turker