Given a dataframe, I am trying to compute how many times I have seen an emailId in the past 30 days. The main logic in my function is the following:
val new_df = df
.withColumn("transaction_timestamp", unix_timestamp($"timestamp").cast(LongType))
val winSpec = Window
.partitionBy("email")
.orderBy(col("transaction_timestamp"))
.rangeBetween(-NumberOfSecondsIn30Days, Window.currentRow)
val resultDF = new_df
.filter(col("condition"))
.withColumn("count", count(col("email")).over(winSpec))
The config:
spark.executor.cores=5
So, I can see 5 stages which have window functions in them, some stages out of those are completed very quickly (in a few seconds) and there are 2 that did not even finish in 3 hours, being stuck at the last few tasks(progressing very slowly):
This is a problem of data skew to me, if I remove all the rows containing the 5 highest frequency email
ids from the dataset, the job finishes soon(less than 5 min).
If I try to use some other key within window partitionBy, the job finishes in a few minutes:
Window.partitionBy("email", "date")
But obviously it performs wrong count calculations if I do that and it's not an acceptable solution.
I have tried various other spark settings throwing more memory, cores, parallelism etc. and none of those have seemed to help.
Spark Version: 2.2
Current Spark configuration:
-executor-memory: 100G
-executor-cores: 5
-driver memory: 80G
-spark.executor.memory=100g
Using machines each with 16 core, 128 gb memory. Maximum # of nodes up to 500.
What would be the right way to tackle this problem?
Update: Just to give more context, here is the original dataframe and the corresponding computed dataframe:
val df = Seq(
("[email protected]", "2019-10-01 00:04:00"),
("[email protected]", "2019-11-02 01:04:00"),
("[email protected]", "2019-11-22 02:04:00"),
("[email protected]", "2019-11-22 05:04:00"),
("[email protected]", "2019-12-02 03:04:00"),
("[email protected]", "2020-01-01 04:04:00"),
("[email protected]", "2020-03-11 05:04:00"),
("[email protected]", "2020-04-05 12:04:00"),
("[email protected]", "2020-05-03 03:04:00")
).toDF("email", "transaction_timestamp")
val expectedDF = Seq(
("[email protected]", "2019-10-01 00:04:00", 1),
("[email protected]", "2019-11-02 01:04:00", 1), // prev one falls outside of last 30 days win
("[email protected]", "2019-11-22 02:04:00", 2),
("[email protected]", "2019-11-22 05:04:00", 3),
("[email protected]", "2019-12-02 03:04:00", 3),
("[email protected]", "2020-01-01 04:04:00", 1),
("[email protected]", "2020-03-11 05:04:00", 1),
("[email protected]", "2020-04-05 12:04:00", 2),
("[email protected]", "2020-05-03 03:04:00", 1) // new email
).toDF("email", "transaction_timestamp", count")
Some of your partitions are probably too large which is due to the fact that for some emails, there is too much data in one month.
To fix this, you can create a new dataframe with only the emails and the timestamps. Then, you group by email and timestamp, count the number of lines and compute the window on hopefully much less data. The computation will be sped up if timestamps tend to be duplicated, that is if df.count
is much greater than df.select("email", "timestamp").distinct.count
. If it is not the case, you can truncate the timestamp at the cost of losing some precision. This way, instead of counting the number of occurrences within the last 30 days (give or take one second since timestamps are in seconds), you would count the number of occurrences give or take one minute or one hour or even one day depending on your need. You would lose a bit of precision but speed up you computation a lot. And the more precision you give in, the more speed you gain.
The code would look like this:
// 3600 means hourly precision.
// Set to 60 for minute precision, 1 for second precision, 24*3600 for one day.
// Note that even precisionLoss = 1 might make you gain speed depending on
// the distribution of your data
val precisionLoss = 3600
val win_size = NumberOfSecondsIn30Days / precisionLoss
val winSpec = Window
.partitionBy("email")
.orderBy("truncated_timestamp")
.rangeBetween(-win_size, Window.currentRow)
val new_df = df.withColumn("truncated_timestamp",
unix_timestamp($"timestamp") / 3600 cast "long")
val counts = new_df
.groupBy("email", "truncated_timestamp")
.count
.withColumn("count", sum('count) over winSpec)
val result = new_df
.join(counts, Seq("email", "truncated_timestamp"))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With