I ran into a surprising behavior when using .select()
:
>>> my_df.show()
+---+---+---+
| a| b| c|
+---+---+---+
| 1| 3| 5|
| 2| 4| 6|
+---+---+---+
>>> a_c = s_df.select(col("a"), col("c")) # removing column b
>>> a_c.show()
+---+---+
| a| c|
+---+---+
| 1| 5|
| 2| 6|
+---+---+
>>> a_c.filter(col("b") == 3).show() # I can still filter on "b"!
+---+---+
| a| c|
+---+---+
| 1| 5|
+---+---+
This behavior got my wondering... Are my following points correct?
DataFrames are just views, a simple DataFrame is a view of itself. In my case a_c
is just a view into my_df
.
When I created a_c
no new data was created, a_c
is just pointing at the same data my_df
is pointing.
If there is additional information that is relevant, please add!
This is happening because of the lazy nature of Spark. It is "smart" enough to push the filter down so that it happens at a lower level - before the filter*. So, since this all happens within the same stage of execution and is able to still be resolved. In fact you can see this in explain
:
== Physical Plan ==
*Project [a#0, c#2]
+- *Filter (b#1 = 3) <---Filter before Project
+- LocalTableScan [A#0, B#1, C#2]
You can force a shuffle and new stage, then see your filter fail, though. Even catching it at compile time. Here's an example:
a_c.groupBy("a","c").count.filter(col("b") === 3)
*There is also a projection pruning that pushes the selection down to database layers if it realizes it doesn't need the column at any point. However I believe the filter would cause it to "need" it and not prune...but I didn't test that.
Let us start with some basics about the spark underlying.This will make your understanding easy. RDD : Underlying the spark core is the data structure called RDD ,which are lazily evaluated. By lazy evaluation we mean that RDD computation happens when the action (like calling a count in RDD or show in dataset).
Dataset or Dataframe(which Dataset[Row]) also uses RDDs at the core.
This means every transformation (like filter) will be realized only when the action is triggered (show).
So your question
"When I created a_c no new data was created, a_c is just pointing at the same data my_df is pointing."
As there is no data which was realized. We have to realize it to bring it to memory. Your filter works on the initial dataframe.
The only way to make your a_c.filter(col("b") == 3).show()
throw a run time exception is to cache your intermediate dataframe by using dataframe.cache.
So spark will throw"main" org.apache.spark.sql.AnalysisException: Cannot resolve column name
Eg.
val a_c = s_df.select(col("a"), col("c")).cache
a_c.filter(col("b") == 3).show()
So spark will throw"main" org.apache.spark.sql.AnalysisException: Cannot resolve column name.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With