Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Deciles or other quantile rank for Pyspark column

Tags:

pyspark

I have a pyspark DF with multiple numeric columns and I want to, for each column calculate the decile or other quantile rank for that row based on each variable.

This is simple for pandas as we can create a new column for each variable using the qcut function to assign the value 0 to n-1 for 'q' as in pd.qcut(x,q=n).

How can this be done in pyspark? I have tried the following but clearly the break points are not unique between these thirds. I want to get the lower 1/3 of the data assigned 1, the next 1/3 assigned 2 and the top 1/3 assigned 3. I want to be able to change this and perhaps use 1/10, 1/32 etc

w =  Window.partitionBy(data.var1).orderBy(data.var1)
d2=df.select(
    "var1",
     ntile(3).over(w).alias("ntile3")
)


agged=d2.groupby('ntile3').agg(F.min("var1").alias("min_var1"),F.max("var1").alias("max_var1"),F.count('*'))
agged.show()

+------+--------+--------+--------+
|ntile3|min_var1|max_var1|count(1)|
+------+--------+--------+--------+
|     1|     0.0|   210.0|  517037|
|     3|     0.0|   206.0|  516917|
|     2|     0.0|   210.0|  516962|
+------+--------+--------+--------+
like image 333
B_Miner Avatar asked Dec 23 '22 13:12

B_Miner


2 Answers

QuantileDiscretizer from 'pyspark.ml.feature' can be used.

values = [(0.1,), (0.4,), (1.2,), (1.5,)]
df = spark.createDataFrame(values, ["values"])
qds = QuantileDiscretizer(numBuckets=2,
...     inputCol="values", outputCol="buckets", relativeError=0.01, handleInvalid="error")
bucketizer = qds.fit(df)
bucketizer.setHandleInvalid("skip").fit(df).transform(df).show()

+------+-------+
|values|buckets|
+------+-------+
|   0.1|    0.0|
|   0.4|    1.0|
|   1.2|    1.0|
|   1.5|    1.0|
+------+-------+
like image 89
pauli Avatar answered Jan 03 '23 03:01

pauli


You can use the percent_rank from pyspark.sql.functions with a window function. For instance for computing deciles you can do:

from pyspark.sql.window import Window
from pyspark.sql.functions import percent_rank

w =  Window.orderBy(data.var1)
data.select('*', ceil(10 * percent_rank().over(w)).alias("decile"))

By doing so you first compute the percent_rank, and then you multiply this by 10 and take the upper integer. Consequently, all values with a percent_rank between 0 and 0.1 will be added to decile 1, all values with a percent_rank between 0.1 and 0.2 will be added to decile 2, etc.

like image 23
Thomas Avatar answered Jan 03 '23 04:01

Thomas