I have a DataFrame with the following columns.
scala> show_times.printSchema
root
 |-- account: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- show_name: string (nullable = true)
 |-- total_time_watched: integer (nullable = true)
This is data about how many times customer has watched watched a particular show. I'm supposed to categorize the customer for each show based on total time watched.
The dataset has 133 million rows in total with 192 distinct show_names.
For each individual show I'm supposed to bin the customer into 3 categories (1,2,3).
I use Spark MLlib's QuantileDiscretizer
Currently I loop through every show and run QuantileDiscretizer in the sequential manner as in the code below.

What I'd like to have in the end is for the following sample input to get the sample output.
Sample Input:
account,channel,show_name,total_time_watched
acct1,ESPN,show1,200
acct2,ESPN,show1,250
acct3,ESPN,show1,800
acct4,ESPN,show1,850
acct5,ESPN,show1,1300
acct6,ESPN,show1,1320
acct1,ESPN,show2,200
acct2,ESPN,show2,250
acct3,ESPN,show2,800
acct4,ESPN,show2,850
acct5,ESPN,show2,1300
acct6,ESPN,show2,1320
Sample Output:
account,channel,show_name,total_time_watched,Time_watched_bin
acct1,ESPN,show1,200,1
acct2,ESPN,show1,250,1
acct3,ESPN,show1,800,2
acct4,ESPN,show1,850,2
acct5,ESPN,show1,1300,3
acct6,ESPN,show1,1320,3
acct1,ESPN,show2,200,1
acct2,ESPN,show2,250,1
acct3,ESPN,show2,800,2
acct4,ESPN,show2,850,2
acct5,ESPN,show2,1300,3
acct6,ESPN,show2,1320,3
Is there a more efficient and distributed way to do it using some groupBy-like operation instead of looping through each show_name and bin it one after other?
I know nothing about QuantileDiscretizer, but think you're mostly concerned with the dataset to apply QuantileDiscretizer to. I think you want to figure out how to split your input dataset into smaller datasets per show_name (you said that there are 192 distinct show_name in the input dataset).
I've noticed that you use parquet as the input format. My understanding of the format is very limited but I've noticed that people are using some partitioning scheme to split large datasets into smaller chunks that they could then process whatever they like (per some partitioning scheme).
In your case the partitioning scheme could include show_name.
That would make your case trivial as the splitting were done at writing time (aka not my problem anymore).
See How to save a partitioned parquet file in Spark 2.1?
Given your iterative solution, you could wrap every iteration into a Future that you'd submit to process in parallel.
Spark SQL's SparkSession (and Spark Core's SparkContext) are thread-safe.
filter and union operatorsI would think twice before following this solution since it puts burden on your shoulders which I think could easily be sorted out by solution 1.
Given you've got one large 133-million-row parquet file, I'd first build the 192 datasets per show_name using filter operator (as you did to build show_rdd which is against the name as it's a DataFrame not RDD) and union (again as you did).
See Dataset API.
That's something I think could work, but didn't check it out myself.
You could use window functions (see WindowSpec and Column's over operator).
Window functions would give you partitioning (windows) while over would somehow apply QuantileDiscretizer to a window/partition. That would however require "destructuring" QuantileDiscretizer into an Estimator to train a model and somehow fit the result model to the window again.
I think it's doable, but haven't done it myself. Sorry.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With