Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark dataframe: collect () vs select ()

Calling collect() on an RDD will return the entire dataset to the driver which can cause out of memory and we should avoid that.

Will collect() behave the same way if called on a dataframe?
What about the select() method?
Does it also work the same way as collect() if called on a dataframe?

like image 374
Mrinal Avatar asked May 25 '17 07:05

Mrinal


People also ask

What does collect () do in Spark?

PySpark Collect() – Retrieve data from DataFrame. Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.

What is the difference between collect and show in Spark?

show() : It will show only the content of the dataframe. df. collect() : It will show the content and metadata of the dataframe.

What can I use instead of Spark collect?

Collect action will try to move all data in RDD/DataFrame to the machine with the driver and where it may run out of memory and crash. Instead, you can make sure that the number of items returned is sampled by calling take or takeSample , or perhaps by filtering your RDD/DataFrame.

What is the difference between select and selectExpr in Spark?

Therefore, select() method is useful when you simply need to select a subset of columns from a particular Spark DataFrame. On the other hand, selectExpr() comes in handy when you need to select particular columns while at the same time you also need to apply some sort of transformation over particular column(s).


2 Answers

Actions vs Transformations

  • Collect (Action) - Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

spark-sql doc

select(*cols) (transformation) - Projects a set of expressions and returns a new DataFrame.

Parameters: cols – list of column names (string) or expressions (Column). If one of the column names is ‘*’, that column is expanded to include all columns in the current DataFrame.**

df.select('*').collect() [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')] df.select('name', 'age').collect() [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)] df.select(df.name, (df.age + 10).alias('age')).collect() [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)] 

Execution select(column-name1,column-name2,etc) method on a dataframe, returns a new dataframe which holds only the columns which were selected in the select() function.

e.g. assuming df has several columns including "name" and "value" and some others.

df2 = df.select("name","value") 

df2 will hold only two columns ("name" and "value") out of the entire columns of df

df2 as the result of select will be in the executors and not in the driver (as in the case of using collect())

sql-programming-guide

df.printSchema() # root # |-- age: long (nullable = true) # |-- name: string (nullable = true)  # Select only the "name" column df.select("name").show() # +-------+ # |   name| # +-------+ # |Michael| # |   Andy| # | Justin| # +-------+ 

You can running collect() on a dataframe (spark docs)

>>> l = [('Alice', 1)] >>> spark.createDataFrame(l).collect() [Row(_1=u'Alice', _2=1)] >>> spark.createDataFrame(l, ['name', 'age']).collect() [Row(name=u'Alice', age=1)] 

spark docs

To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node thus: rdd.collect().foreach(println). This can cause the driver to run out of memory, though, because collect() fetches the entire RDD to a single machine; if you only need to print a few elements of the RDD, a safer approach is to use the take(): rdd.take(100).foreach(println).

like image 196
Yaron Avatar answered Oct 13 '22 22:10

Yaron


calling select will result is lazy evaluation: for example:

val df1 = df.select("col1") val df2 = df1.filter("col1 == 3") 

both above statements create lazy path that will be executed when you call action on that df, such as show, collect etc.

val df3 = df2.collect() 

use .explain at the end of your transformation to follow its plan here is more detailed info Transformations and Actions

like image 22
elcomendante Avatar answered Oct 13 '22 22:10

elcomendante