Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What exactly does .select() do?

I ran into a surprising behavior when using .select():

>>> my_df.show()
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  3|  5|
|  2|  4|  6|
+---+---+---+

>>> a_c = s_df.select(col("a"), col("c")) # removing column b
>>> a_c.show()
+---+---+
|  a|  c|
+---+---+
|  1|  5|
|  2|  6|
+---+---+

>>> a_c.filter(col("b") == 3).show() # I can still filter on "b"!
+---+---+
|  a|  c|
+---+---+
|  1|  5|
+---+---+

This behavior got my wondering... Are my following points correct?

DataFrames are just views, a simple DataFrame is a view of itself. In my case a_c is just a view into my_df.

When I created a_c no new data was created, a_c is just pointing at the same data my_df is pointing.

If there is additional information that is relevant, please add!

like image 997
Akavall Avatar asked Dec 22 '17 00:12

Akavall


2 Answers

This is happening because of the lazy nature of Spark. It is "smart" enough to push the filter down so that it happens at a lower level - before the filter*. So, since this all happens within the same stage of execution and is able to still be resolved. In fact you can see this in explain:

== Physical Plan ==
*Project [a#0, c#2]
+- *Filter (b#1 = 3) <---Filter before Project
   +- LocalTableScan [A#0, B#1, C#2]

You can force a shuffle and new stage, then see your filter fail, though. Even catching it at compile time. Here's an example:

a_c.groupBy("a","c").count.filter(col("b") === 3)

*There is also a projection pruning that pushes the selection down to database layers if it realizes it doesn't need the column at any point. However I believe the filter would cause it to "need" it and not prune...but I didn't test that.

like image 143
Justin Pihony Avatar answered Sep 22 '22 13:09

Justin Pihony


Let us start with some basics about the spark underlying.This will make your understanding easy. RDD : Underlying the spark core is the data structure called RDD ,which are lazily evaluated. By lazy evaluation we mean that RDD computation happens when the action (like calling a count in RDD or show in dataset).

Dataset or Dataframe(which Dataset[Row]) also uses RDDs at the core.

This means every transformation (like filter) will be realized only when the action is triggered (show).

So your question "When I created a_c no new data was created, a_c is just pointing at the same data my_df is pointing."
As there is no data which was realized. We have to realize it to bring it to memory. Your filter works on the initial dataframe. The only way to make your a_c.filter(col("b") == 3).show() throw a run time exception is to cache your intermediate dataframe by using dataframe.cache. So spark will throw"main" org.apache.spark.sql.AnalysisException: Cannot resolve column name Eg.

val a_c = s_df.select(col("a"), col("c")).cache
         a_c.filter(col("b") == 3).show()

So spark will throw"main" org.apache.spark.sql.AnalysisException: Cannot resolve column name.

like image 36
Amit Joshi Avatar answered Sep 24 '22 13:09

Amit Joshi