Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark: grouby and then get max value of each group

I would like to group by a value and then find the max value in each group using PySpark. I have the following code but now I am bit stuck on how to extract the max value.

# some file contains tuples ('user', 'item', 'occurrences')
data_file = sc.textData('file:///some_file.txt')
# Create the triplet so I index stuff
data_file = data_file.map(lambda l: l.split()).map(lambda l: (l[0], l[1], float(l[2])))
# Group by the user i.e. r[0]
grouped = data_file.groupBy(lambda r: r[0])
# Here is where I am stuck 
group_list = grouped.map(lambda x: (list(x[1]))) #?

Returns something like:

[[(u'u1', u's1', 20), (u'u1', u's2', 5)], [(u'u2', u's3', 5), (u'u2', u's2', 10)]]

I want to find max 'occurrence' for each user now. The final result after doing the max would result in a RDD that looked like this:

[[(u'u1', u's1', 20)], [(u'u2', u's2', 10)]]

Where only the max dataset would remain for each of the users in the file. In other words, I want to change the value of the RDD to contain only a single triplet the each users max occurrences.

like image 735
user985030 Avatar asked Nov 15 '15 03:11

user985030


People also ask

How do you find the max value in PySpark?

In PySpark, find/select maximum (max) row per group can be calculated using Window. partitionBy() function and running row_number() function over window partition, let's see with a DataFrame example.

How do you do multiple aggregate functions in PySpark?

Groupby Aggregate on Multiple Columns in PySpark can be performed by passing two or more columns to the groupBy() function and using the agg(). The following example performs grouping on department and state columns and on the result, I have used the count() function within agg(). Yields below output.

How do you do a groupBy count in PySpark?

PySpark Groupby Count is used to get the number of records for each group. So to perform the count, first, you need to perform the groupBy() on DataFrame which groups the records based on single or multiple column values, and then do the count() to get the number of records for each group.

What does PySpark groupBy () do?

Similar to SQL GROUP BY clause, PySpark groupBy() function is used to collect the identical data into groups on DataFrame and perform count, sum, avg, min, max functions on the grouped data.


2 Answers

There is no need for groupBy here. Simple reduceByKey would do just fine and most of the time will be more efficient:

data_file = sc.parallelize([
   (u'u1', u's1', 20), (u'u1', u's2', 5),
   (u'u2', u's3', 5), (u'u2', u's2', 10)])

max_by_group = (data_file
  .map(lambda x: (x[0], x))  # Convert to PairwiseRD
  # Take maximum of the passed arguments by the last element (key)
  # equivalent to:
  # lambda x, y: x if x[-1] > y[-1] else y
  .reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[-1])) 
  .values()) # Drop keys

max_by_group.collect()
## [('u2', 's2', 10), ('u1', 's1', 20)]
like image 50
zero323 Avatar answered Oct 12 '22 23:10

zero323


I think I found the solution:

from pyspark import SparkContext, SparkConf

def reduce_by_max(rdd):
    """
    Helper function to find the max value in a list of values i.e. triplets. 
    """
    max_val = rdd[0][2]
    the_index = 0

    for idx, val in enumerate(rdd):
        if val[2] > max_val:
            max_val = val[2]
            the_index = idx

    return rdd[the_index]

conf = SparkConf() \
    .setAppName("Collaborative Filter") \
    .set("spark.executor.memory", "5g")
sc = SparkContext(conf=conf)

# some file contains tuples ('user', 'item', 'occurrences')
data_file = sc.textData('file:///some_file.txt')

# Create the triplet so I can index stuff
data_file = data_file.map(lambda l: l.split()).map(lambda l: (l[0], l[1], float(l[2])))

# Group by the user i.e. r[0]
grouped = data_file.groupBy(lambda r: r[0])

# Get the values as a list
group_list = grouped.map(lambda x: (list(x[1]))) 

# Get the max value for each user. 
max_list = group_list.map(reduce_by_max).collect()
like image 28
user985030 Avatar answered Oct 12 '22 22:10

user985030