Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark approxQuantile function

I have dataframe with these columns id, price, timestamp.

I would like to find median value grouped by id.

I am using this code to find it but it's giving me this error.

from pyspark.sql import DataFrameStatFunctions as statFunc
windowSpec = Window.partitionBy("id")
median = statFunc.approxQuantile("price",
                                 [0.5],
                                 0) \
                 .over(windowSpec)

return df.withColumn("Median", median)

Is it not possible to use DataFrameStatFunctions to fill values in new column?

TypeError: unbound method approxQuantile() must be called with DataFrameStatFunctions instance as first argument (got str instance instead)
like image 221
BK C. Avatar asked Jul 24 '17 18:07

BK C.


People also ask

What is Approxquantile PySpark?

Calculates the approximate quantiles of numerical columns of a DataFrame .

How do you find the percentile of a column in PySpark?

percentile_approx. Returns the approximate percentile of the numeric column col which is the smallest value in the ordered col values (sorted from least to greatest) such that no more than percentage of col values is less than the value or equal to that value. The value of percentage must be between 0.0 and 1.0.

How do you get deciles in PySpark?

By doing so you first compute the percent_rank, and then you multiply this by 10 and take the upper integer. Consequently, all values with a percent_rank between 0 and 0.1 will be added to decile 1, all values with a percent_rank between 0.1 and 0.2 will be added to decile 2, etc. Save this answer.

What is Col () in PySpark?

col (col: str) → pyspark.sql.column.Column[source] Returns a Column based on the given column name.


2 Answers

Since PySpark 3.1.0 the percentile_approx function has been introduced that solves this problem.

The function percentile_approx returns a list, thus you need to slice the first element.

As in:

windowSpec = Window.partitionBy("id")
df.withColumn("Median", F.percentile_approx("price", [0.5]).over(windowSpec)[0])
like image 196
ionathan Avatar answered Sep 19 '22 12:09

ionathan


If you are fine with aggregation instead of the window function, there is also the option to use a pandas_udf. They are not as fast as pure Spark though. Here is an adapted example from the docs:

from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "price")
)

@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def median_udf(v):
    return v.median()

df.groupby("id").agg(median_udf(df["price"])).show()
like image 44
00schneider Avatar answered Sep 17 '22 12:09

00schneider