I have dataframe with these columns id
, price
, timestamp
.
I would like to find median value grouped by id
.
I am using this code to find it but it's giving me this error.
from pyspark.sql import DataFrameStatFunctions as statFunc
windowSpec = Window.partitionBy("id")
median = statFunc.approxQuantile("price",
[0.5],
0) \
.over(windowSpec)
return df.withColumn("Median", median)
Is it not possible to use DataFrameStatFunctions
to fill values in new column?
TypeError: unbound method approxQuantile() must be called with DataFrameStatFunctions instance as first argument (got str instance instead)
Calculates the approximate quantiles of numerical columns of a DataFrame .
percentile_approx. Returns the approximate percentile of the numeric column col which is the smallest value in the ordered col values (sorted from least to greatest) such that no more than percentage of col values is less than the value or equal to that value. The value of percentage must be between 0.0 and 1.0.
By doing so you first compute the percent_rank, and then you multiply this by 10 and take the upper integer. Consequently, all values with a percent_rank between 0 and 0.1 will be added to decile 1, all values with a percent_rank between 0.1 and 0.2 will be added to decile 2, etc. Save this answer.
col (col: str) → pyspark.sql.column.Column[source] Returns a Column based on the given column name.
Since PySpark 3.1.0 the percentile_approx
function has been introduced that solves this problem.
The function percentile_approx
returns a list, thus you need to slice the first element.
As in:
windowSpec = Window.partitionBy("id")
df.withColumn("Median", F.percentile_approx("price", [0.5]).over(windowSpec)[0])
If you are fine with aggregation instead of the window function, there is also the option to use a pandas_udf. They are not as fast as pure Spark though. Here is an adapted example from the docs:
from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "price")
)
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def median_udf(v):
return v.median()
df.groupby("id").agg(median_udf(df["price"])).show()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With