I just tried doing a countDistinct
over a window and got this error:
AnalysisException: u'Distinct window functions are not supported: count(distinct color#1926)
Is there a way to do a distinct count over a window in pyspark?
Here's some example code:
from pyspark.sql.window import Window from pyspark.sql import functions as F #function to calculate number of seconds from number of days days = lambda i: i * 86400 df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00", "orange"), (13, "2017-03-15T12:27:18+00:00", "red"), (25, "2017-03-18T11:27:18+00:00", "red")], ["dollars", "timestampGMT", "color"]) df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp')) #create window by casting timestamp to long (number of seconds) w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0)) df = df.withColumn('distinct_color_count_over_the_last_week', F.countDistinct("color").over(w)) df.show()
This is the output I'd like to see:
+-------+--------------------+------+---------------------------------------+ |dollars| timestampGMT| color|distinct_color_count_over_the_last_week| +-------+--------------------+------+---------------------------------------+ | 17|2017-03-10 15:27:...|orange| 1| | 13|2017-03-15 12:27:...| red| 2| | 25|2017-03-18 11:27:...| red| 1| +-------+--------------------+------+---------------------------------------+
In Pyspark, there are two ways to get the count of distinct values. We can use distinct() and count() functions of DataFrame to get the count distinct of PySpark DataFrame. Another way is to use SQL countDistinct() function which will provide the distinct value count of all the selected columns.
For counting the number of rows we are using the count() function df. count() which extracts the number of rows from the Dataframe and storing it in the variable named as 'row' For counting the number of columns we are using df.
ntile (n)[source] Window function: returns the ntile group id (from 1 to n inclusive) in an ordered window partition. For example, if n is 4, the first quarter of the rows will get value 1, the second quarter will get 2, the third quarter will get 3, and the last quarter will get 4.
EDIT:
As noleto mentions in his answer below, there is now an approx_count_distinct function since pyspark 2.1 that works over a window.
Original Answer
I figured out that I can use a combination of the collect_set and size functions to mimic the functionality of countDistinct over a window:
from pyspark.sql.window import Window from pyspark.sql import functions as F #function to calculate number of seconds from number of days days = lambda i: i * 86400 #create some test data df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00", "orange"), (13, "2017-03-15T12:27:18+00:00", "red"), (25, "2017-03-18T11:27:18+00:00", "red")], ["dollars", "timestampGMT", "color"]) #convert string timestamp to timestamp type df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp')) #create window by casting timestamp to long (number of seconds) w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0)) #use collect_set and size functions to perform countDistinct over a window df = df.withColumn('distinct_color_count_over_the_last_week', F.size(F.collect_set("color").over(w))) df.show()
This results in the distinct count of color over the previous week of records:
+-------+--------------------+------+---------------------------------------+ |dollars| timestampGMT| color|distinct_color_count_over_the_last_week| +-------+--------------------+------+---------------------------------------+ | 17|2017-03-10 15:27:...|orange| 1| | 13|2017-03-15 12:27:...| red| 2| | 25|2017-03-18 11:27:...| red| 1| +-------+--------------------+------+---------------------------------------+
@Bob Swain's answer is nice and works! Since then, Spark version 2.1, Spark offers an equivalent to countDistinct
function, approx_count_distinct
which is more efficient to use and most importantly, supports counting distinct over a window.
Here goes the code to drop in replacement:
#approx_count_distinct supports a window df = df.withColumn('distinct_color_count_over_the_last_week', F.approx_count_distinct("color").over(w))
For columns with small cardinalities, result is supposed to be the same as "countDistinct". When dataset grows a lot, you should consider adjusting the parameter rsd
– maximum estimation error allowed, which allows you to tune the trade-off precision/performance.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With