I have a spark DataFrame with multiple columns. I would like to group the rows based on one column, and then find the mode of the second column for each group. Working with a pandas DataFrame, I would do something like this:
rand_values = np.random.randint(max_value,
size=num_values).reshape((num_values/2, 2))
rand_values = pd.DataFrame(rand_values, columns=['x', 'y'])
rand_values['x'] = rand_values['x'] > max_value/2
rand_values['x'] = rand_values['x'].astype('int32')
print(rand_values)
## x y
## 0 0 0
## 1 0 4
## 2 0 1
## 3 1 1
## 4 1 2
def mode(series):
return scipy.stats.mode(series['y'])[0][0]
rand_values.groupby('x').apply(mode)
## x
## 0 4
## 1 1
## dtype: int64
Within pyspark, I am able to find the mode of a single column doing
df = sql_context.createDataFrame(rand_values)
def mode_spark(df, column):
# Group by column and count the number of occurrences
# of each x value
counts = df.groupBy(column).count()
# - Find the maximum value in the 'counts' column
# - Join with the counts dataframe to select the row
# with the maximum count
# - Select the first element of this dataframe and
# take the value in column
mode = counts.join(
counts.agg(F.max('count').alias('count')),
on='count'
).limit(1).select(column)
return mode.first()[column]
mode_spark(df, 'x')
## 1
mode_spark(df, 'y')
## 1
I'm at a loss for how to apply that function to grouped data. If it's not possible to directly apply this logic to a DataFrame, is it possible to achieve the same effect by some other means?
Thank you in advance!
Solution suggested by zero323.
Original solution: https://stackoverflow.com/a/35226857/1560062
First, count the occurances of each (x, y) combination.
counts = df.groupBy(['x', 'y']).count().alias('counts')
counts.show()
## +---+---+-----+
## | x| y|count|
## +---+---+-----+
## | 0| 1| 2|
## | 0| 3| 2|
## | 0| 4| 2|
## | 1| 1| 3|
## | 1| 3| 1|
## +---+---+-----+
Solution 1: Group by 'x', aggregate by taking the maximum value of the counts in each group. Finally, Drop the 'count' column.
result = (counts
.groupBy('x')
.agg(F.max(F.struct(F.col('count'),
F.col('y'))).alias('max'))
.select(F.col('x'), F.col('max.y'))
)
result.show()
## +---+---+
## | x| y|
## +---+---+
## | 0| 4|
## | 1| 1|
## +---+---+
Solution 2: Using a window, partition by 'x', and order by the 'count' column. Now, pick the first row in each of the partitions.
win = Window().partitionBy('x').orderBy(F.col('count').desc())
result = (counts
.withColumn('row_num', F.rowNumber().over(win))
.where(F.col('row_num') == 1)
.select('x', 'y')
)
result.show()
## +---+---+
## | x| y|
## +---+---+
## | 0| 1|
## | 1| 1|
## +---+---+
The two results have a different outcome because of the way the rows are sorted. If there are no ties, the two methods give the same result.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With