I am seeing some performance issues while running queries using dataframes. I have seen in my research, that long running finally tasks can be a sign that data is not disturbed optimally, but have not found a detailed process for resolving this issue.
I am starting off loading two tables as dataframes, and I am then joining those tables on one field. I have tried to add distribute by(repartition), and sort by, in order to improve the performance, but am still seeing this single long running final task. Here is a simple version of my code, note that query one and two are not actually this simple and use UDFs to calculate some values.
I have tried a few different settings for spark.sql.shuffle
. I have tried 100, but it failed(I didn't really debug this to much to be honest). I tried 300, 4000, and 8000. Performance decreased with each increase. I am selecting a single day of data, where each file is an hour.
val df1 = sqlContext.sql("Select * from Table1")
val df2 = sqlContext.sql("Select * from Table2")
val distributeDf1 = df1
.repartition(df1("userId"))
.sortWithinPartitions(df1("userId"))
val distributeDf2 = df2
.repartition(df2("userId"))
.sortWithinPartitions(df2("userId"))
distributeDf1.registerTempTable("df1")
distributeDf2.registerTempTable("df2")
val df3 = sqlContext
.sql("""
Select
df1.*
from
df1
left outer join df2 on
df1.userId = df2.userId""")
Since it seems partitioning by userId is not ideal, I could partition by the timestamp instead. If I do this, should I just do the Date + Hour? If I have less then 200 unique combos for this, will I have empty executors?
Partitioning your DataSet While Spark chooses good reasonable defaults for your data, if your Spark job runs out of memory or runs slowly, bad partitioning could be at fault. If your dataset is large, you can try repartitioning (using the repartition method) to a larger number to allow more parallelism on your job.
YARN container memory overhead can also cause Spark applications to slow down because it takes YARN longer to allocate larger pools of memory. What happens is YARN runs every Spark component, like drivers and executors, within containers.
On a typical day, Spark needed around one hour to finish it, but sometimes it required over four hours. The first problem was quite easy to spot. There was one task that needed more time to finish than others. That one task was running for over three hours, all of the others finished in under five minutes.
Spark persisting/caching is one of the best techniques to improve the performance of the Spark workloads. Spark Cache and P ersist are optimization techniques in DataFrame / Dataset for iterative and interactive Spark applications to improve the performance of Jobs.
Spark Performance tuning is a process to improve the performance of the Spark and PySpark applications by adjusting and optimizing system resources (CPU cores and memory), tuning some configurations, and following some framework guidelines and best practices. Spark application performance can be improved in several ways.
Spark Optimisation Techniques 1 Shuffle Partitions. Shuffle partitions are partitions that are used when shuffling data for join or aggregations. 2 Predicate Pushdown. In SQL, whenever you use a query that has both join and where condition, what happens is Join first... 3 BroadCast Joins. More ...
Spark >= 3.0
Since 3.0 Spark provides built-in optimizations for handling skewed joins - which can be enabled using spark.sql.adaptive.optimizeSkewedJoin.enabled
property.
See SPARK-29544 for details.
Spark < 3.0
You clearly have a problem with a huge right data skew. Lets take a look a the statistics you've provided:
df1 = [mean=4.989209978967438, stddev=2255.654165352454, count=2400088]
df2 = [mean=1.0, stddev=0.0, count=18408194]
With mean around 5 and standard deviation over 2000 you get a long tail.
Since some keys are much more frequent than other after repartitioning some executors will have much more work to do than remaining ones.
Furthermoreb your description suggests that the problem can be with a single or a few keys which hash to the same partition.
So, let's first identify outliers (pseudocode):
val mean = 4.989209978967438
val sd = 2255.654165352454
val df1 = sqlContext.sql("Select * from Table1")
val counts = df.groupBy("userId").count.cache
val frequent = counts
.where($"count" > mean + 2 * sd) // Adjust threshold based on actual dist.
.alias("frequent")
.join(df1, Seq("userId"))
and the rest:
val infrequent = counts
.where($"count" <= mean + 2 * sd)
.alias("infrequent")
.join(df1, Seq("userId"))
Is it really something to be expected? If not, try to identify the source of the issue upstream.
If it is expected, you can try:
broadcasting smaller table:
val df2 = sqlContext.sql("Select * from Table2")
df2.join(broadcast(df1), Seq("userId"), "rightouter")
splitting, unifying (union
) and broadcasting only frequent:
df2.join(broadcast(frequent), Seq("userId"), "rightouter")
.union(df2.join(infrequent, Seq("userId"), "rightouter"))
salting userId
with some random data
but you shouldn't:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With