Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spark: How does salting work in dealing with skewed data

I have a skewed data in a table which is then compared with other table that is small. I understood that salting works in case of joins- that is a random number is appended to keys in big table with skew data from a range of random data and the rows in small table with no skew data are duplicated with the same range of random numbers. Hence the matching happens because there will be a hit in one among the duplicate values for particular salted key of skewed able. I also read that salting is helpful while performing groupby. My question is when random numbers are appended to the key doesn't it break the group? If it does then the meaning of group by operation has changed.

like image 459
Bishamon Ten Avatar asked Sep 26 '19 05:09

Bishamon Ten


People also ask

How does Spark overcome data skewness?

We need to change/rewrite our ETL logic to perform a left join with the not_null table and execute a union with the null column as ultimately null keys won't participate in the join. Hence, we can avoid a shuffle and the GC Pause issue on the table by following this technique with large null values.

How do you overcome data skewness?

A data transformation may be used to reduce skewness. A distribution that is symmetric or nearly so is often easier to handle and interpret than a skewed distribution. More specifically, a normal or Gaussian distribution is often regarded as ideal as it is assumed by many statistical methods.

How does Spark determine data skew?

Let's see how to identify skew and how to identify and mitigate skew in your data. Step 1: Read data from the Table into a data frame. Step 2: Find the number of rows per partition. Here, the function spark_partition_id() returns the current partition id, by plotting the result graphically you will notice the skew.


2 Answers

My question is when random numbers are appended to the key doesn't it break the group?

Well, it does, to mitigate this you could run group by operation twice. Firstly with salted key, then remove salting and group again. The second grouping will take partially aggregated data, thus significantly reduce skew impact.

E.g.

import org.apache.spark.sql.functions._

df.withColumn("salt", (rand * n).cast(IntegerType))
  .groupBy("salt", groupByFields)
  .agg(aggFields)
  .groupBy(groupByFields)
  .agg(aggFields)
like image 195
Gelerion Avatar answered Oct 21 '22 14:10

Gelerion


var df1 = Seq((1,"a"),(2,"b"),(1,"c"),(1,"x"),(1,"y"),(1,"g"),(1,"k"),(1,"u"),(1,"n")).toDF("ID","NAME") 

df1.createOrReplaceTempView("fact")

var df2 = Seq((1,10),(2,30),(3,40)).toDF("ID","SALARY")

df2.createOrReplaceTempView("dim")

val salted_df1 = spark.sql("""select concat(ID, '_', FLOOR(RAND(123456)*19)) as salted_key, NAME from fact """)

salted_df1.createOrReplaceTempView("salted_fact")

val exploded_dim_df = spark.sql(""" select ID, SALARY, explode(array(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19)) as salted_key from dim""")

//val exploded_dim_df = spark.sql(""" select ID, SALARY, explode(array(0 to 19)) as salted_key from dim""")

exploded_dim_df.createOrReplaceTempView("salted_dim")

val result_df = spark.sql("""select split(fact.salted_key, '_')[0] as ID, dim.SALARY 
            from salted_fact fact 
            LEFT JOIN salted_dim dim 
            ON fact.salted_key = concat(dim.ID, '_', dim.salted_key) """)
display(result_df)
like image 34
Shiva Avatar answered Oct 21 '22 15:10

Shiva