I would like to group by a value and then find the max value in each group using PySpark. I have the following code but now I am bit stuck on how to extract the max value.
# some file contains tuples ('user', 'item', 'occurrences')
data_file = sc.textData('file:///some_file.txt')
# Create the triplet so I index stuff
data_file = data_file.map(lambda l: l.split()).map(lambda l: (l[0], l[1], float(l[2])))
# Group by the user i.e. r[0]
grouped = data_file.groupBy(lambda r: r[0])
# Here is where I am stuck
group_list = grouped.map(lambda x: (list(x[1]))) #?
Returns something like:
[[(u'u1', u's1', 20), (u'u1', u's2', 5)], [(u'u2', u's3', 5), (u'u2', u's2', 10)]]
I want to find max 'occurrence' for each user now. The final result after doing the max would result in a RDD that looked like this:
[[(u'u1', u's1', 20)], [(u'u2', u's2', 10)]]
Where only the max dataset would remain for each of the users in the file. In other words, I want to change the value of the RDD to contain only a single triplet the each users max occurrences.
In PySpark, find/select maximum (max) row per group can be calculated using Window. partitionBy() function and running row_number() function over window partition, let's see with a DataFrame example.
Groupby Aggregate on Multiple Columns in PySpark can be performed by passing two or more columns to the groupBy() function and using the agg(). The following example performs grouping on department and state columns and on the result, I have used the count() function within agg(). Yields below output.
PySpark Groupby Count is used to get the number of records for each group. So to perform the count, first, you need to perform the groupBy() on DataFrame which groups the records based on single or multiple column values, and then do the count() to get the number of records for each group.
Similar to SQL GROUP BY clause, PySpark groupBy() function is used to collect the identical data into groups on DataFrame and perform count, sum, avg, min, max functions on the grouped data.
There is no need for groupBy
here. Simple reduceByKey
would do just fine and most of the time will be more efficient:
data_file = sc.parallelize([
(u'u1', u's1', 20), (u'u1', u's2', 5),
(u'u2', u's3', 5), (u'u2', u's2', 10)])
max_by_group = (data_file
.map(lambda x: (x[0], x)) # Convert to PairwiseRD
# Take maximum of the passed arguments by the last element (key)
# equivalent to:
# lambda x, y: x if x[-1] > y[-1] else y
.reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[-1]))
.values()) # Drop keys
max_by_group.collect()
## [('u2', 's2', 10), ('u1', 's1', 20)]
I think I found the solution:
from pyspark import SparkContext, SparkConf
def reduce_by_max(rdd):
"""
Helper function to find the max value in a list of values i.e. triplets.
"""
max_val = rdd[0][2]
the_index = 0
for idx, val in enumerate(rdd):
if val[2] > max_val:
max_val = val[2]
the_index = idx
return rdd[the_index]
conf = SparkConf() \
.setAppName("Collaborative Filter") \
.set("spark.executor.memory", "5g")
sc = SparkContext(conf=conf)
# some file contains tuples ('user', 'item', 'occurrences')
data_file = sc.textData('file:///some_file.txt')
# Create the triplet so I can index stuff
data_file = data_file.map(lambda l: l.split()).map(lambda l: (l[0], l[1], float(l[2])))
# Group by the user i.e. r[0]
grouped = data_file.groupBy(lambda r: r[0])
# Get the values as a list
group_list = grouped.map(lambda x: (list(x[1])))
# Get the max value for each user.
max_list = group_list.map(reduce_by_max).collect()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With