I have a dataset that I want to map over using several Pyspark SQL Grouped Map UDFs, at different stages of a larger ETL process that runs on ephemeral clusters in AWS EMR. The Grouped Map API requires that the Pyspark dataframe be grouped prior to the apply, but I have no need to actually group keys.
At the moment, I'm using an arbitrary grouping, which works, but results in:
An unnecessary shuffle.
Hacky code for an arbitrary groupby in each job.
My ideal solution allows a vectorized Pandas UDF apply without an arbitrary grouping, but if I could save the arbitrary grouping that would at least eliminate the shuffles.
EDIT:
Here's what my code looks like. I was originally using an arbitrary grouping, but am currently trying spark_partition_id()
based on a comment below by @pault.
@pandas_udf(b_schema, PandasUDFType.GROUPED_MAP)
def transform(a_partition):
b = a_partition.drop("pid", axis=1)
# Some other transform stuff
return b
(sql
.read.parquet(a_path)
.withColumn("pid", spark_partition_id())
.groupBy("pid")
.apply(transform)
.write.parquet(b_path))
Using spark_partition_id()
seems to still result in a shuffle. I get the following DAG:
Similar to SQL GROUP BY clause, PySpark groupBy() function is used to collect the identical data into groups on DataFrame and perform count, sum, avg, min, max functions on the grouped data.
Due to parallel execution on all cores on multiple machines, PySpark runs operations faster than Pandas, hence we often required to covert Pandas DataFrame to PySpark (Spark with Python) for better performance. This is one of the major differences between Pandas vs PySpark DataFrame.
Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas UDF is defined using the pandas_udf as a decorator or to wrap the function, and no additional configuration is required.
A pandas user-defined function (UDF)—also known as vectorized UDF—is a user-defined function that uses Apache Arrow to transfer data and pandas to work with the data. pandas UDFs allow vectorized operations that can increase performance up to 100x compared to row-at-a-time Python UDFs.
To support roughly equivalent logic (functions (pandas.core.frame.DataFrame) -> pandas.core.frame.DataFrame
) you'll have to switch to Spark 3.0.0 and use MAP_ITER
transformation.
In the latest preview version (3.0.0-preview2) you'll need an UDF:
@pandas_udf(b_schema, PandasUDFType.MAP_ITER)
def transform(dfs):
for df in dfs:
b = df.drop("pid", axis=1)
...
yield b
df.mapInPandas(transform)
and in the upcoming 3.0.0 release (SPARK-28264) just a plain function:
def transform(dfs):
for df in dfs:
b = df.drop("pid", axis=1)
# Some other transform stuff
...
yield b
df.mapInPandas(transform, b_schema)
A possible workaround on 2.x would be to use plain SCALAR
UDF, serialize each row of the result as JSON, and deserialize it on the other side, i.e.
import json
from pyspark.sql.functions import from_json
@pandas_udf("string", PandasUDFType.SCALAR)
def transform(col1, col2):
b = pd.DataFrame({"x": col1, "y": col2})
...
return b.apply(lambda x: json.dumps(dict(zip(df.columns, x))), axis=1)
(df
.withColumn("json_result", transform("col1", "col2"))
.withColumn("a_struct", from_json("json_result", b_schema)))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With