Assuming that we have a healthy cluster and for the use case we have
two datasets with 1 Billlion + records
We need to compare both datasets and find out
duplicates in the original dataset
I was planning to write a
sql query with join on the columns that are to be checked for duplicates
I wanted to know how will be the
performance for this query and also the refinements
that can be done in the datasets(dataframe partitioning) before joining them.
Please do pitch in with your observations.
I wanted to know how will be the performance
Compared to what? As for absolute numbers, I think it will obviously depend on your data and your cluster.
However in Spark 2.0 performance improvements are quite significant.
and the refinements
The Catalyst optimizer is pretty good (more so after 2.0). Underneath it takes care of most of your optimizations like column pruning, predicate push down etc. (In 2.0 there's also code generation that takes care of generating a very optimized code that achieves very large performance improvements.)
And these same improvements are available across the board whether you use DataFrames/Datasets API or SQL.
As an example of kind of query optimizations that Spark's catalyst does, lets say you have two dataframes df1 and df2 with same schema (as your case) and you want to join them on some columns to only get the intersection and output those tuples.
Let's say my schema for data frames is as following (calling df.schema
):
StructType(
StructField(df.id,StringType,true),
StructField(df.age,StringType,true),
StructField(df.city,StringType,true),
StructField(df.name,StringType,true))
that is we have id, age, city, name columns in my data sets.
Now given what you want to do you will do something like
df1.join(
df2,
$"df2.name"===$"df1.name"
).select("df1.id","df1.name", "df1.age", "df1.city" ).show
If you look at the physical plan of the above you will notice many optimizations done under the hood by Catalyst optimizer:
== Physical Plan ==
*Project [df1.id#898, df1.name#904, df1.age#886, df1.city#892]
+- *BroadcastHashJoin [df1.name#904], [df2.name#880], Inner, BuildRight
:- *Project [age#96 AS df1.age#886, city#97 AS df1.city#892, id#98 AS df1.id#898, name#99 AS df1.name#904]
: +- *Filter isnotnull(name#99)
: +- *Scan json [age#96,city#97,id#98,name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<age:string,city:string,id:string,name:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- *Project [name#99 AS df2.name#880]
+- *Filter isnotnull(name#99)
+- *Scan json [name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:string>
`
In particular notice that even though two same dataframes are being joined they are being read differently --
Predicate push down: from the query it is evident to Spark that for df2 all you need is the name
column (and not the entire record with id, age
etc). Wouldn't it be great if this information is pushed down to location where my data is being read? That will save me from reading unnecessary data that I do not plan to use. That is exactly what Spark did ! For one side of the join Spark will only read name
column. This line:
+- *Scan json [name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:string>
For the other side df1
however, we want all four columns in the result after join. Again Spark figures this out and for that side it reads all four columns. This line: +- *Scan json [age#96,city#97,id#98,name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<age:string,city:string,id:string,name:string>
Also just after reading and before join Spark figured out that you're joining on name
columns. So before join it removed the tuples that had name as null
. This line: +- *Filter isnotnull(name#99)
.
This means that Spark is already doing all this heavy lifting for you so that the minimum data is read and brought into memory (thus reducing shuffle and compute time).
However, for your specific case you might want to think whether you can reduce this data read further -- for at least one side of the join. What if you have many rows in df2 that have same combinations of keys against which you're matching the df1. Will you not be better off by first doing a distinct on df2? I.e. Something like:
df1.join(
df2.select("df2.name").distinct,
$"df2.name"===$"df1.name"
).select("df1.id","df1.name", "df1.age", "df1.city" )
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With