I have a skewed data in a table which is then compared with other table that is small. I understood that salting works in case of joins- that is a random number is appended to keys in big table with skew data from a range of random data and the rows in small table with no skew data are duplicated with the same range of random numbers. Hence the matching happens because there will be a hit in one among the duplicate values for particular salted key of skewed able. I also read that salting is helpful while performing groupby. My question is when random numbers are appended to the key doesn't it break the group? If it does then the meaning of group by operation has changed.
We need to change/rewrite our ETL logic to perform a left join with the not_null table and execute a union with the null column as ultimately null keys won't participate in the join. Hence, we can avoid a shuffle and the GC Pause issue on the table by following this technique with large null values.
A data transformation may be used to reduce skewness. A distribution that is symmetric or nearly so is often easier to handle and interpret than a skewed distribution. More specifically, a normal or Gaussian distribution is often regarded as ideal as it is assumed by many statistical methods.
Let's see how to identify skew and how to identify and mitigate skew in your data. Step 1: Read data from the Table into a data frame. Step 2: Find the number of rows per partition. Here, the function spark_partition_id() returns the current partition id, by plotting the result graphically you will notice the skew.
My question is when random numbers are appended to the key doesn't it break the group?
Well, it does, to mitigate this you could run group by operation twice. Firstly with salted key, then remove salting and group again. The second grouping will take partially aggregated data, thus significantly reduce skew impact.
E.g.
import org.apache.spark.sql.functions._
df.withColumn("salt", (rand * n).cast(IntegerType))
.groupBy("salt", groupByFields)
.agg(aggFields)
.groupBy(groupByFields)
.agg(aggFields)
var df1 = Seq((1,"a"),(2,"b"),(1,"c"),(1,"x"),(1,"y"),(1,"g"),(1,"k"),(1,"u"),(1,"n")).toDF("ID","NAME")
df1.createOrReplaceTempView("fact")
var df2 = Seq((1,10),(2,30),(3,40)).toDF("ID","SALARY")
df2.createOrReplaceTempView("dim")
val salted_df1 = spark.sql("""select concat(ID, '_', FLOOR(RAND(123456)*19)) as salted_key, NAME from fact """)
salted_df1.createOrReplaceTempView("salted_fact")
val exploded_dim_df = spark.sql(""" select ID, SALARY, explode(array(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19)) as salted_key from dim""")
//val exploded_dim_df = spark.sql(""" select ID, SALARY, explode(array(0 to 19)) as salted_key from dim""")
exploded_dim_df.createOrReplaceTempView("salted_dim")
val result_df = spark.sql("""select split(fact.salted_key, '_')[0] as ID, dim.SALARY
from salted_fact fact
LEFT JOIN salted_dim dim
ON fact.salted_key = concat(dim.ID, '_', dim.salted_key) """)
display(result_df)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With