Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Mode of grouped data in (py)Spark

I have a spark DataFrame with multiple columns. I would like to group the rows based on one column, and then find the mode of the second column for each group. Working with a pandas DataFrame, I would do something like this:

rand_values = np.random.randint(max_value,
                                size=num_values).reshape((num_values/2, 2))
rand_values = pd.DataFrame(rand_values, columns=['x', 'y'])
rand_values['x'] = rand_values['x'] > max_value/2
rand_values['x'] = rand_values['x'].astype('int32')

print(rand_values)
##    x  y
## 0  0  0
## 1  0  4
## 2  0  1
## 3  1  1
## 4  1  2

def mode(series):
    return scipy.stats.mode(series['y'])[0][0]

rand_values.groupby('x').apply(mode)
## x
## 0    4
## 1    1
## dtype: int64

Within pyspark, I am able to find the mode of a single column doing

df = sql_context.createDataFrame(rand_values)

def mode_spark(df, column):
    # Group by column and count the number of occurrences
    # of each x value
    counts = df.groupBy(column).count()

    # - Find the maximum value in the 'counts' column
    # - Join with the counts dataframe to select the row
    #   with the maximum count
    # - Select the first element of this dataframe and
    #   take the value in column
    mode = counts.join(
        counts.agg(F.max('count').alias('count')),
        on='count'
    ).limit(1).select(column)

    return mode.first()[column]

mode_spark(df, 'x')
## 1
mode_spark(df, 'y')
## 1

I'm at a loss for how to apply that function to grouped data. If it's not possible to directly apply this logic to a DataFrame, is it possible to achieve the same effect by some other means?

Thank you in advance!

like image 512
bjack3 Avatar asked Apr 15 '16 18:04

bjack3


1 Answers

Solution suggested by zero323.

Original solution: https://stackoverflow.com/a/35226857/1560062

First, count the occurances of each (x, y) combination.

counts = df.groupBy(['x', 'y']).count().alias('counts')
counts.show()
## +---+---+-----+
## |  x|  y|count|
## +---+---+-----+
## |  0|  1|    2|
## |  0|  3|    2|
## |  0|  4|    2|
## |  1|  1|    3|
## |  1|  3|    1|
## +---+---+-----+

Solution 1: Group by 'x', aggregate by taking the maximum value of the counts in each group. Finally, Drop the 'count' column.

result = (counts
          .groupBy('x')
          .agg(F.max(F.struct(F.col('count'),
                              F.col('y'))).alias('max'))
          .select(F.col('x'), F.col('max.y'))
         )
result.show()
## +---+---+
## |  x|  y|
## +---+---+
## |  0|  4|
## |  1|  1|
## +---+---+

Solution 2: Using a window, partition by 'x', and order by the 'count' column. Now, pick the first row in each of the partitions.

win = Window().partitionBy('x').orderBy(F.col('count').desc())
result = (counts
          .withColumn('row_num', F.rowNumber().over(win))
          .where(F.col('row_num') == 1)
          .select('x', 'y')
         )
result.show()
## +---+---+
## |  x|  y|
## +---+---+
## |  0|  1|
## |  1|  1|
## +---+---+

The two results have a different outcome because of the way the rows are sorted. If there are no ties, the two methods give the same result.

like image 93
bjack3 Avatar answered Sep 27 '22 20:09

bjack3