Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark: count distinct over a window

I just tried doing a countDistinct over a window and got this error:

AnalysisException: u'Distinct window functions are not supported: count(distinct color#1926)

Is there a way to do a distinct count over a window in pyspark?

Here's some example code:

from pyspark.sql.window import Window     from pyspark.sql import functions as F  #function to calculate number of seconds from number of days days = lambda i: i * 86400  df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00", "orange"),                     (13, "2017-03-15T12:27:18+00:00", "red"),                     (25, "2017-03-18T11:27:18+00:00", "red")],                     ["dollars", "timestampGMT", "color"])                      df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))  #create window by casting timestamp to long (number of seconds) w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))  df = df.withColumn('distinct_color_count_over_the_last_week', F.countDistinct("color").over(w))  df.show() 

This is the output I'd like to see:

+-------+--------------------+------+---------------------------------------+ |dollars|        timestampGMT| color|distinct_color_count_over_the_last_week| +-------+--------------------+------+---------------------------------------+ |     17|2017-03-10 15:27:...|orange|                                      1| |     13|2017-03-15 12:27:...|   red|                                      2| |     25|2017-03-18 11:27:...|   red|                                      1| +-------+--------------------+------+---------------------------------------+ 
like image 846
Bob Swain Avatar asked Aug 24 '17 19:08

Bob Swain


People also ask

How do you count distinct in PySpark?

In Pyspark, there are two ways to get the count of distinct values. We can use distinct() and count() functions of DataFrame to get the count distinct of PySpark DataFrame. Another way is to use SQL countDistinct() function which will provide the distinct value count of all the selected columns.

How do you display the count of DataFrame in PySpark?

For counting the number of rows we are using the count() function df. count() which extracts the number of rows from the Dataframe and storing it in the variable named as 'row' For counting the number of columns we are using df.

What is Ntile PySpark?

ntile (n)[source] Window function: returns the ntile group id (from 1 to n inclusive) in an ordered window partition. For example, if n is 4, the first quarter of the rows will get value 1, the second quarter will get 2, the third quarter will get 3, and the last quarter will get 4.


Video Answer


2 Answers

EDIT:

As noleto mentions in his answer below, there is now an approx_count_distinct function since pyspark 2.1 that works over a window.


Original Answer

I figured out that I can use a combination of the collect_set and size functions to mimic the functionality of countDistinct over a window:

from pyspark.sql.window import Window from pyspark.sql import functions as F  #function to calculate number of seconds from number of days days = lambda i: i * 86400  #create some test data df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00", "orange"),                     (13, "2017-03-15T12:27:18+00:00", "red"),                     (25, "2017-03-18T11:27:18+00:00", "red")],                     ["dollars", "timestampGMT", "color"])  #convert string timestamp to timestamp type              df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))  #create window by casting timestamp to long (number of seconds) w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))  #use collect_set and size functions to perform countDistinct over a window df = df.withColumn('distinct_color_count_over_the_last_week', F.size(F.collect_set("color").over(w)))  df.show() 

This results in the distinct count of color over the previous week of records:

+-------+--------------------+------+---------------------------------------+ |dollars|        timestampGMT| color|distinct_color_count_over_the_last_week| +-------+--------------------+------+---------------------------------------+ |     17|2017-03-10 15:27:...|orange|                                      1| |     13|2017-03-15 12:27:...|   red|                                      2| |     25|2017-03-18 11:27:...|   red|                                      1| +-------+--------------------+------+---------------------------------------+ 
like image 51
Bob Swain Avatar answered Sep 18 '22 23:09

Bob Swain


@Bob Swain's answer is nice and works! Since then, Spark version 2.1, Spark offers an equivalent to countDistinct function, approx_count_distinct which is more efficient to use and most importantly, supports counting distinct over a window.

Here goes the code to drop in replacement:

#approx_count_distinct supports a window df = df.withColumn('distinct_color_count_over_the_last_week', F.approx_count_distinct("color").over(w)) 

For columns with small cardinalities, result is supposed to be the same as "countDistinct". When dataset grows a lot, you should consider adjusting the parameter rsd – maximum estimation error allowed, which allows you to tune the trade-off precision/performance.

like image 29
noleto Avatar answered Sep 21 '22 23:09

noleto