Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Performance Of Joins in Spark-SQL

Assuming that we have a healthy cluster and for the use case we have

two datasets with 1 Billlion + records

We need to compare both datasets and find out

duplicates in the original dataset

I was planning to write a

sql query with join on the columns that are to be checked for duplicates

I wanted to know how will be the

performance for this query and also the refinements

that can be done in the datasets(dataframe partitioning) before joining them.

Please do pitch in with your observations.

like image 295
Aviral Kumar Avatar asked Sep 21 '16 11:09

Aviral Kumar


1 Answers

I wanted to know how will be the performance

Compared to what? As for absolute numbers, I think it will obviously depend on your data and your cluster.

However in Spark 2.0 performance improvements are quite significant.

and the refinements

The Catalyst optimizer is pretty good (more so after 2.0). Underneath it takes care of most of your optimizations like column pruning, predicate push down etc. (In 2.0 there's also code generation that takes care of generating a very optimized code that achieves very large performance improvements.)

And these same improvements are available across the board whether you use DataFrames/Datasets API or SQL.

As an example of kind of query optimizations that Spark's catalyst does, lets say you have two dataframes df1 and df2 with same schema (as your case) and you want to join them on some columns to only get the intersection and output those tuples.

Let's say my schema for data frames is as following (calling df.schema):

StructType(
StructField(df.id,StringType,true), 
StructField(df.age,StringType,true), 
StructField(df.city,StringType,true), 
StructField(df.name,StringType,true))

that is we have id, age, city, name columns in my data sets.

Now given what you want to do you will do something like

df1.join(
    df2, 
    $"df2.name"===$"df1.name"
       ).select("df1.id","df1.name", "df1.age", "df1.city" ).show

If you look at the physical plan of the above you will notice many optimizations done under the hood by Catalyst optimizer:

== Physical Plan ==
*Project [df1.id#898, df1.name#904, df1.age#886, df1.city#892]
+- *BroadcastHashJoin [df1.name#904], [df2.name#880], Inner, BuildRight
   :- *Project [age#96 AS df1.age#886, city#97 AS df1.city#892, id#98 AS df1.id#898, name#99 AS df1.name#904]
   :  +- *Filter isnotnull(name#99)
   :     +- *Scan json [age#96,city#97,id#98,name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<age:string,city:string,id:string,name:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- *Project [name#99 AS df2.name#880]
         +- *Filter isnotnull(name#99)
            +- *Scan json [name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:string>

`

In particular notice that even though two same dataframes are being joined they are being read differently --

  1. Predicate push down: from the query it is evident to Spark that for df2 all you need is the name column (and not the entire record with id, age etc). Wouldn't it be great if this information is pushed down to location where my data is being read? That will save me from reading unnecessary data that I do not plan to use. That is exactly what Spark did ! For one side of the join Spark will only read name column. This line: +- *Scan json [name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:string> For the other side df1 however, we want all four columns in the result after join. Again Spark figures this out and for that side it reads all four columns. This line: +- *Scan json [age#96,city#97,id#98,name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<age:string,city:string,id:string,name:string>

  2. Also just after reading and before join Spark figured out that you're joining on name columns. So before join it removed the tuples that had name as null. This line: +- *Filter isnotnull(name#99).

This means that Spark is already doing all this heavy lifting for you so that the minimum data is read and brought into memory (thus reducing shuffle and compute time).

However, for your specific case you might want to think whether you can reduce this data read further -- for at least one side of the join. What if you have many rows in df2 that have same combinations of keys against which you're matching the df1. Will you not be better off by first doing a distinct on df2? I.e. Something like:

df1.join(
    df2.select("df2.name").distinct, 
    $"df2.name"===$"df1.name"
   ).select("df1.id","df1.name", "df1.age", "df1.city" )
like image 148
Sachin Tyagi Avatar answered Oct 13 '22 01:10

Sachin Tyagi