Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark display max value(S) and multiple sorting

Grateful for some help here. Using Pyspark (cannot use SQL please). So I have a list of tuples stored as RDD Pairs:

[(('City1', '2020-03-27', 'X1'), 44),

(('City1', '2020-03-28', 'X1'), 44),

(('City3', '2020-03-28', 'X3'), 15),

(('City4', '2020-03-27', 'X4'), 5),

(('City4', '2020-03-26', 'X4'), 4),

(('City2', '2020-03-26', 'X2'), 14),

(('City2', '2020-03-25', 'X2'), 4),

(('City4', '2020-03-25', 'X4'), 1),

(('City1', '2020-03-29', 'X1'), 1),

(('City5', '2020-03-25', 'X5'), 15)]

With for example ('City5', '2020-03-25', 'X5') as the Key, and 15 as the value of the last pair.

I would like to obtain the following outcome:

City1, X1, 2020-03-27, 44

City1, X1, 2020-03-28, 44

City5, X3, 2020-03-25, 15

City3, X3, 2020-03-28, 15

City2, X2, 2020-03-26, 14

City4, X4, 2020-03-27, 5

Please notice that the outcome displays:

  • The Key(s) with the max value for each city (That's the hardest part, to display same city twice if they have similar max(values) in different dates, I'm assuming cannot use ReduceByKey() as Key is not unique, maybe GroupBy() or Filter() ?

  • In the following sequencing of order/sorting:

  1. Descending largest value
  2. Ascending date
  3. Descending city name (ex: City1)

So I have tried the following code:

res = rdd2.map(lambda x: ((x[0][0],x[0][2]), (x[0][1], x[1])))
rdd3 = res.reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[1]))
rdd4 = rdd3.sortBy(lambda a: a[1][1], ascending=False)
rdd5 = rdd4.sortBy(lambda a: a[1][0])

Although it does give me the cities with the max value, it doesn't return the same city twice (because reduced by Key: City) if 2 cities has similar max value in 2 different dates.

I hope its clear enough, any precision please ask! Thanks so much!

like image 309
JohnDoe34 Avatar asked May 11 '26 06:05

JohnDoe34


1 Answers

To keep all cities with value equals to max value, you can still use reduceByKey but over arrays instead of over values:

  • you transform your rows into key/value, with value being an array of tuple instead of a tuple
  • you reduce by key, merging arrays if they contain the same value, else keeping array that has the max value, with reduceByKey
  • you flatten your value arrays, merging the key with them, with flatMap
  • finally you perform your sort

Complete code would be as follows:

def merge(array1, array2):
    if array1[0][2] > array2[0][2]:
        return array1
    elif array1[0][2] == array2[0][2]:
        return array1 + array2
    else:
        return array2


res = rdd2.map(lambda x: (x[0][0], [(x[0][1], x[0][2], x[1])]))
rdd3 = res.reduceByKey(lambda x1, x2: merge(x1, x2))
rdd4 = rdd3.flatMap(lambda x: map(lambda y: (x[0], y[1], y[0], y[2]), x[1]))
rdd5 = rdd4.sortBy(lambda a: (-a[3], a[2], a[0]))

And then you can print your RDD:

[print(', '.join([row[0], row[1], row[2], str(row[3])])) for row in rdd5.collect()]

That, with your input, gives you the following output:

City1, X1, 2020-03-27, 44
City1, X1, 2020-03-28, 44
City5, X5, 2020-03-25, 15
City3, X3, 2020-03-28, 15
City2, X2, 2020-03-26, 14
City4, X4, 2020-03-27, 5
like image 126
Vincent Doba Avatar answered May 12 '26 20:05

Vincent Doba



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!