Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Select Specific Columns from Spark DataFrame

I have loaded CSV data into a Spark DataFrame.

I need to slice this dataframe into two different dataframes, where each one contains a set of columns from the original dataframe.

How do I select a subset into a Spark dataframe, based on columns ?

like image 698
A.HADDAD Avatar asked Aug 04 '18 20:08

A.HADDAD


People also ask

How do I select certain columns in Spark DataFrame?

You can select the single or multiple columns of the Spark DataFrame by passing the column names you wanted to select to the select() function. Since DataFrame is immutable, this creates a new DataFrame with a selected columns. show() function is used to show the DataFrame contents.

How do I select a specific column in PySpark?

In PySpark we can select columns using the select() function. The select() function allows us to select single or multiple columns in different formats.

How do I get columns from Spark DataFrame?

In order to convert Spark DataFrame Column to List, first select() the column you want, next use the Spark map() transformation to convert the Row to String, finally collect() the data to the driver which returns an Array[String] .

How do I select a specific column in Spark SQL?

In Spark SQL, select () function is used to select one or multiple columns, nested columns, column by index, all columns, from the list, by regular expression from a DataFrame. select () is a transformation function in Spark and returns a new DataFrame with the selected columns. You can also alias column names while selecting.

What is the use of select () function in spark?

In Spark SQL, select () function is used to select one or multiple columns, nested columns, column by index, all columns, from the list, by regular expression from a DataFrame. select () is a transformation function in Spark and returns a new DataFrame with the selected columns.

How do I select a specific column in a Dataframe?

You can select the single or multiple columns of the DataFrame by passing the column names you wanted to select to the select () function. Since DataFrame is immutable, this creates a new DataFrame with selected columns. show () function is used to show the Dataframe contents.

How to select columns from the pyspark Dataframe?

In this article, we will discuss how to select columns from the pyspark dataframe. To do this we will use the select() function. Syntax: dataframe.select(parameter).show() where, dataframe is the dataframe name; paramter is the column(s) to be selected; show() function is used to display the selected column. Let’s create a sample dataframe


Video Answer


2 Answers

If you want to split you dataframe into two different ones, do two selects on it with the different columns you want.

 val sourceDf = spark.read.csv(...)
 val df1 = sourceDF.select("first column", "second column", "third column")
 val df2 = sourceDF.select("first column", "second column", "third column")

Note that this of course means that the sourceDf would be evaluated twice, so if it can fit into distributed memory and you use most of the columns across both dataframes it might be a good idea to cache it. It it has many extra columns that you don't need, then you can do a select on it first to select on the columns you will need so it would store all that extra data in memory.

like image 177
puhlen Avatar answered Oct 10 '22 15:10

puhlen


There are multiple options (especially in Scala) to select a subset of columns of that Dataframe. The following lines show the options and most of them are documented in the ScalaDocs of Column:

import spark.implicits._
import org.apache.spark.sql.functions.{col, column, expr}

inputDf.select(col("colA"), col("colB"))
inputDf.select(inputDf.col("colA"), inputDf.col("colB"))
inputDf.select(column("colA"), column("colB"))
inputDf.select(expr("colA"), expr("colB"))

// only available in Scala
inputDf.select($"colA", $"colB")
inputDf.select('colA, 'colB) // makes use of Scala's Symbol

// selecting columns based on a given iterable of Strings
val selectedColumns: Seq[Column] = Seq("colA", "colB").map(c => col(c))
inputDf.select(selectedColumns: _*)

// Special cases
col("columnName.field")     // Extracting a struct field
col("`a.column.with.dots`") // Escape `.` in column names.

// select the first or last 2 columns
inputDf.selectExpr(inputDf.columns.take(2): _*)
inputDf.selectExpr(inputDf.columns.takeRight(2): _*)

The usage of $ is possible as Scala provides an implicit class that converts a String into a Column using the method $:

implicit class StringToColumn(val sc : scala.StringContext) extends scala.AnyRef {
  def $(args : scala.Any*) : org.apache.spark.sql.ColumnName = { /* compiled code */ }
}

Typically, when you want to derive one DataFrame to multiple DataFrames it might improve your performance if you persist the original DataFrame before creating the others. At the end you can unpersist the original DataFrame.

Keep in mind that Columns are not resolved at compile time but only when it is compared to the column names of your catalog which happens during analyser phase of the query execution. In case you need stronger type safety you could create a Dataset.

For completeness, here is the csv to try out above code:

// csv file:
// colA,colB,colC
// 1,"foo","bar"

val inputDf = spark.read.format("csv").option("header", "true").load(csvFilePath)

// resulting DataFrame schema
root
 |-- colA: string (nullable = true)
 |-- colB: string (nullable = true)
 |-- colC: string (nullable = true)
like image 10
Michael Heil Avatar answered Oct 10 '22 15:10

Michael Heil