Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark groupby and max value selection

I have a PySpark dataframe like

 name   city     date
 satya  Mumbai  13/10/2016
 satya  Pune    02/11/2016
 satya  Mumbai  22/11/2016
 satya  Pune    29/11/2016
 satya  Delhi   30/11/2016
 panda  Delhi   29/11/2016
 brata  BBSR    28/11/2016
 brata  Goa     30/10/2016
 brata  Goa     30/10/2016

I need to find-out most preferred CITY for each name and Logic is " take city as fav_city if city having max no. of city occurrence on aggregate 'name'+'city' pair". And if multiple same occurrence found then consider city with latest Date. WIll explain:

d = df.groupby('name','city').count()
#name  city  count
brata Goa    2  #clear favourite
brata BBSR   1
panda Delhi  1  #as single so clear favourite
satya Pune   2  ##Confusion
satya Mumbai 2  ##confusion
satya Delhi  1   ##shd be discard as other cities having higher count than this city

#So get cities having max count
dd = d.groupby('name').agg(F.max('count').alias('count'))
ddd = dd.join(d,['name','count'],'left')
#name  count  city
 brata    2   Goa    #fav found
 panda    1   Delhi  #fav found
 satya    2   Mumbai #can't say
 satya    2   Pune   #can't say

In case of user 'satya' I need to go back to trx_history and get latest date for cities having equal_max count I:e from 'Mumbai' or 'Pune' which is last transacted (max date), consider that city as fav_city. In this case 'Pune' as '29/11/2016' is latest/max date.

But I am not able to proceed further how to get that done.

Please help me with logic or if any better solution(faster/compact way), please suggest. Thanks.

like image 541
Satya Avatar asked Nov 30 '16 13:11

Satya


2 Answers

First convert date to the DateType:

import pyspark.sql.functions as F

df_with_date = df.withColumn(
    "date",
    F.to_date("date", "dd/MM/yyyy")
    # For Spark < 2.2
    # F.unix_timestamp("date", "dd/MM/yyyy").cast("timestamp").cast("date")
)

Next groupBy user and city but extend aggregation like this:

df_agg = (df_with_date
    .groupBy("name", "city")
    .agg(F.count("city").alias("count"), F.max("date").alias("max_date")))

Define a window:

from pyspark.sql.window import Window

w = Window().partitionBy("name").orderBy(F.desc("count"), F.desc("max_date"))

Add rank:

df_with_rank = (df_agg
    .withColumn("rank", F.dense_rank().over(w)))

And filter:

result = df_with_rank.where(F.col("rank") == 1)

You can detect remaining duplicates using code like this:

import sys

final_w = Window().partitionBy("name").rowsBetween(-sys.maxsize, sys.maxsize)
result.withColumn("tie", F.count("*").over(final_w) != 1)
like image 157
zero323 Avatar answered Sep 30 '22 12:09

zero323


d = df.groupby('name','city').count()
#name  city  count
brata Goa    2  #clear favourite
brata BBSR   1
panda Delhi  1  #as single so clear favourite
satya Pune   2  ##Confusion
satya Mumbai 2  ##confusion
satya Delhi  1   ##shd be discard as other cities having higher count than this city

#So get cities having max count
dd = d.groupby('name').count().sort(F.col('count').desc())
display(dd.take(1))
like image 23
Vijay Sukumar Avatar answered Sep 30 '22 12:09

Vijay Sukumar