Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark window partition function taking forever to complete

Given a dataframe, I am trying to compute how many times I have seen an emailId in the past 30 days. The main logic in my function is the following:

val new_df = df
  .withColumn("transaction_timestamp", unix_timestamp($"timestamp").cast(LongType))

val winSpec = Window
  .partitionBy("email")
  .orderBy(col("transaction_timestamp"))
  .rangeBetween(-NumberOfSecondsIn30Days, Window.currentRow)

val resultDF = new_df
  .filter(col("condition"))
  .withColumn("count", count(col("email")).over(winSpec))

The config:

spark.executor.cores=5

So, I can see 5 stages which have window functions in them, some stages out of those are completed very quickly (in a few seconds) and there are 2 that did not even finish in 3 hours, being stuck at the last few tasks(progressing very slowly):

This is a problem of data skew to me, if I remove all the rows containing the 5 highest frequency email ids from the dataset, the job finishes soon(less than 5 min).

If I try to use some other key within window partitionBy, the job finishes in a few minutes:

 Window.partitionBy("email", "date")

But obviously it performs wrong count calculations if I do that and it's not an acceptable solution.

I have tried various other spark settings throwing more memory, cores, parallelism etc. and none of those have seemed to help.

Spark Version: 2.2

Current Spark configuration:

-executor-memory: 100G

-executor-cores: 5

-driver memory: 80G

-spark.executor.memory=100g

Using machines each with 16 core, 128 gb memory. Maximum # of nodes up to 500.

What would be the right way to tackle this problem?

Update: Just to give more context, here is the original dataframe and the corresponding computed dataframe:

 val df = Seq(
      ("[email protected]", "2019-10-01 00:04:00"),
      ("[email protected]", "2019-11-02 01:04:00"), 
      ("[email protected]", "2019-11-22 02:04:00"),
      ("[email protected]", "2019-11-22 05:04:00"),
      ("[email protected]", "2019-12-02 03:04:00"),
      ("[email protected]", "2020-01-01 04:04:00"),
      ("[email protected]", "2020-03-11 05:04:00"),
      ("[email protected]", "2020-04-05 12:04:00"),
      ("[email protected]", "2020-05-03 03:04:00")  
    ).toDF("email", "transaction_timestamp")


val expectedDF = Seq(
      ("[email protected]", "2019-10-01 00:04:00", 1),
      ("[email protected]", "2019-11-02 01:04:00", 1), // prev one falls outside of last 30 days win
      ("[email protected]", "2019-11-22 02:04:00", 2),
      ("[email protected]", "2019-11-22 05:04:00", 3),
      ("[email protected]", "2019-12-02 03:04:00", 3),
      ("[email protected]", "2020-01-01 04:04:00", 1),
      ("[email protected]", "2020-03-11 05:04:00", 1),
      ("[email protected]", "2020-04-05 12:04:00", 2),
      ("[email protected]", "2020-05-03 03:04:00", 1) // new email
).toDF("email", "transaction_timestamp", count") 
like image 588
ic10503 Avatar asked Oct 08 '20 14:10

ic10503


Video Answer


1 Answers

Some of your partitions are probably too large which is due to the fact that for some emails, there is too much data in one month.

To fix this, you can create a new dataframe with only the emails and the timestamps. Then, you group by email and timestamp, count the number of lines and compute the window on hopefully much less data. The computation will be sped up if timestamps tend to be duplicated, that is if df.count is much greater than df.select("email", "timestamp").distinct.count. If it is not the case, you can truncate the timestamp at the cost of losing some precision. This way, instead of counting the number of occurrences within the last 30 days (give or take one second since timestamps are in seconds), you would count the number of occurrences give or take one minute or one hour or even one day depending on your need. You would lose a bit of precision but speed up you computation a lot. And the more precision you give in, the more speed you gain.

The code would look like this:

// 3600 means hourly precision.
// Set to 60 for minute precision, 1 for second precision, 24*3600 for one day.
// Note that even precisionLoss = 1 might make you gain speed depending on
// the distribution of your data
val precisionLoss = 3600 
val win_size = NumberOfSecondsIn30Days / precisionLoss

val winSpec = Window
  .partitionBy("email")
  .orderBy("truncated_timestamp")
  .rangeBetween(-win_size, Window.currentRow)

val new_df = df.withColumn("truncated_timestamp",
                      unix_timestamp($"timestamp") / 3600 cast "long")

val counts = new_df
  .groupBy("email", "truncated_timestamp")
  .count
  .withColumn("count", sum('count) over winSpec)

val result = new_df
  .join(counts, Seq("email", "truncated_timestamp"))
like image 66
Oli Avatar answered Oct 21 '22 20:10

Oli