Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark SQL Pandas Grouped Map without GroupBy?

I have a dataset that I want to map over using several Pyspark SQL Grouped Map UDFs, at different stages of a larger ETL process that runs on ephemeral clusters in AWS EMR. The Grouped Map API requires that the Pyspark dataframe be grouped prior to the apply, but I have no need to actually group keys.

At the moment, I'm using an arbitrary grouping, which works, but results in:

  1. An unnecessary shuffle.

  2. Hacky code for an arbitrary groupby in each job.

My ideal solution allows a vectorized Pandas UDF apply without an arbitrary grouping, but if I could save the arbitrary grouping that would at least eliminate the shuffles.

EDIT:

Here's what my code looks like. I was originally using an arbitrary grouping, but am currently trying spark_partition_id() based on a comment below by @pault.


@pandas_udf(b_schema, PandasUDFType.GROUPED_MAP)
def transform(a_partition):
  b = a_partition.drop("pid", axis=1)
  # Some other transform stuff
  return b

(sql
  .read.parquet(a_path)
  .withColumn("pid", spark_partition_id())
  .groupBy("pid")
  .apply(transform)
  .write.parquet(b_path))

Using spark_partition_id() seems to still result in a shuffle. I get the following DAG:

Stage 1

  1. Scan parquet
  2. Project
  3. Project
  4. Exchange

Stage 2

  1. Exchange
  2. Sort
  3. FlatMapGroupsInPandas
like image 829
Dave Avatar asked Nov 06 '19 17:11

Dave


People also ask

What does PySpark Groupby () do?

Similar to SQL GROUP BY clause, PySpark groupBy() function is used to collect the identical data into groups on DataFrame and perform count, sum, avg, min, max functions on the grouped data.

Is PySpark always faster than pandas?

Due to parallel execution on all cores on multiple machines, PySpark runs operations faster than Pandas, hence we often required to covert Pandas DataFrame to PySpark (Spark with Python) for better performance. This is one of the major differences between Pandas vs PySpark DataFrame.

How do you use pandas UDF in PySpark?

Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas UDF is defined using the pandas_udf as a decorator or to wrap the function, and no additional configuration is required.

Is Panda UDF faster?

A pandas user-defined function (UDF)—also known as vectorized UDF—is a user-defined function that uses Apache Arrow to transfer data and pandas to work with the data. pandas UDFs allow vectorized operations that can increase performance up to 100x compared to row-at-a-time Python UDFs.


1 Answers

To support roughly equivalent logic (functions (pandas.core.frame.DataFrame) -> pandas.core.frame.DataFrame) you'll have to switch to Spark 3.0.0 and use MAP_ITER transformation.

In the latest preview version (3.0.0-preview2) you'll need an UDF:

@pandas_udf(b_schema, PandasUDFType.MAP_ITER)
def transform(dfs):
    for df in dfs:
        b = df.drop("pid", axis=1)
        ...
        yield b

df.mapInPandas(transform)

and in the upcoming 3.0.0 release (SPARK-28264) just a plain function:

def transform(dfs):
    for df in dfs:
        b = df.drop("pid", axis=1)
        # Some other transform stuff
        ...
        yield b

df.mapInPandas(transform, b_schema)

A possible workaround on 2.x would be to use plain SCALAR UDF, serialize each row of the result as JSON, and deserialize it on the other side, i.e.

import json
from pyspark.sql.functions import from_json

@pandas_udf("string", PandasUDFType.SCALAR)
def transform(col1, col2):
    b = pd.DataFrame({"x": col1, "y": col2})
    ...
    return b.apply(lambda x: json.dumps(dict(zip(df.columns, x))), axis=1)


(df
    .withColumn("json_result", transform("col1", "col2"))
    .withColumn("a_struct", from_json("json_result", b_schema)))
like image 51
10465355 Avatar answered Oct 25 '22 19:10

10465355