Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark DataFrame operators (nunique, multiplication)

I'm using jupyter notebook with pandas, but when i use Spark, i want to use Spark DataFrame to convert or computation instead of Pandas. Please help me convert some computation to Spark DataFrame or RDD.

DataFrame:

df =
+--------+-------+---------+--------+
| userId | item  |  price  |  value |
+--------+-------+---------+--------+
|  169   | I0111 |  5300   |   1    |
|  169   | I0973 |  70     |   1    |
|  336   | C0174 |  455    |   1    |
|  336   | I0025 |  126    |   1    |
|  336   | I0973 |   4     |   1    |
| 770963 | B0166 |   2     |   1    |
| 1294537| I0110 |  90     |   1    |
+--------+-------+---------+--------+

1. Using Pandas computing:

(1)  userItem = df.groupby(['userId'])['item'].nunique()

and result is a Series object:

+--------+------+
| userId |      |
+--------+------+
|  169   |   2  |
|  336   |   3  |
| 770963 |   1  |
| 1294537|   1  |
+--------+------+

2. Using multiplication

data_sum = df.groupby(['userId', 'item'])['value'].sum()  --> result is Series object

average_played = np.mean(userItem)  --> result is number

(2)  weighted_games_played = data_sum * (average_played / userItem)

Please help me using Spark DataFrame and Opertors on Spark to do this (1) and (2)

like image 366
Phong Nguyen Avatar asked Sep 27 '17 07:09

Phong Nguyen


1 Answers

You can achieve (1) using something like the following:

import pyspark.sql.functions as f
userItem=df.groupby('userId').agg(f.expr('count(distinct item)').alias('n_item'))

and for (2):

data_sum=df.groupby(['userId','item']).agg(f.sum('value').alias('sum_value'))

average_played=userItem.agg(f.mean('n_item').alias('avg_played'))

data_sum=data_sum.join(userItem, on='userId').crossJoin(average_played)

data_sum=data_sum.withColumn("weighted_games_played", f.expr("sum_value*avg_played/n_item"))
like image 134
ags29 Avatar answered Nov 08 '22 23:11

ags29