I have loaded CSV data into a Spark DataFrame.
I need to slice this dataframe into two different dataframes, where each one contains a set of columns from the original dataframe.
How do I select a subset into a Spark dataframe, based on columns ?
You can select the single or multiple columns of the Spark DataFrame by passing the column names you wanted to select to the select() function. Since DataFrame is immutable, this creates a new DataFrame with a selected columns. show() function is used to show the DataFrame contents.
In PySpark we can select columns using the select() function. The select() function allows us to select single or multiple columns in different formats.
In order to convert Spark DataFrame Column to List, first select() the column you want, next use the Spark map() transformation to convert the Row to String, finally collect() the data to the driver which returns an Array[String] .
In Spark SQL, select () function is used to select one or multiple columns, nested columns, column by index, all columns, from the list, by regular expression from a DataFrame. select () is a transformation function in Spark and returns a new DataFrame with the selected columns. You can also alias column names while selecting.
In Spark SQL, select () function is used to select one or multiple columns, nested columns, column by index, all columns, from the list, by regular expression from a DataFrame. select () is a transformation function in Spark and returns a new DataFrame with the selected columns.
You can select the single or multiple columns of the DataFrame by passing the column names you wanted to select to the select () function. Since DataFrame is immutable, this creates a new DataFrame with selected columns. show () function is used to show the Dataframe contents.
In this article, we will discuss how to select columns from the pyspark dataframe. To do this we will use the select() function. Syntax: dataframe.select(parameter).show() where, dataframe is the dataframe name; paramter is the column(s) to be selected; show() function is used to display the selected column. Let’s create a sample dataframe
If you want to split you dataframe into two different ones, do two selects on it with the different columns you want.
val sourceDf = spark.read.csv(...)
val df1 = sourceDF.select("first column", "second column", "third column")
val df2 = sourceDF.select("first column", "second column", "third column")
Note that this of course means that the sourceDf would be evaluated twice, so if it can fit into distributed memory and you use most of the columns across both dataframes it might be a good idea to cache it. It it has many extra columns that you don't need, then you can do a select on it first to select on the columns you will need so it would store all that extra data in memory.
There are multiple options (especially in Scala) to select a subset of columns of that Dataframe. The following lines show the options and most of them are documented in the ScalaDocs of Column:
import spark.implicits._
import org.apache.spark.sql.functions.{col, column, expr}
inputDf.select(col("colA"), col("colB"))
inputDf.select(inputDf.col("colA"), inputDf.col("colB"))
inputDf.select(column("colA"), column("colB"))
inputDf.select(expr("colA"), expr("colB"))
// only available in Scala
inputDf.select($"colA", $"colB")
inputDf.select('colA, 'colB) // makes use of Scala's Symbol
// selecting columns based on a given iterable of Strings
val selectedColumns: Seq[Column] = Seq("colA", "colB").map(c => col(c))
inputDf.select(selectedColumns: _*)
// Special cases
col("columnName.field") // Extracting a struct field
col("`a.column.with.dots`") // Escape `.` in column names.
// select the first or last 2 columns
inputDf.selectExpr(inputDf.columns.take(2): _*)
inputDf.selectExpr(inputDf.columns.takeRight(2): _*)
The usage of $
is possible as Scala provides an implicit class that converts a String into a Column using the method $
:
implicit class StringToColumn(val sc : scala.StringContext) extends scala.AnyRef {
def $(args : scala.Any*) : org.apache.spark.sql.ColumnName = { /* compiled code */ }
}
Typically, when you want to derive one DataFrame to multiple DataFrames it might improve your performance if you persist
the original DataFrame before creating the others. At the end you can unpersist
the original DataFrame.
Keep in mind that Columns are not resolved at compile time but only when it is compared to the column names of your catalog which happens during analyser phase of the query execution. In case you need stronger type safety you could create a Dataset
.
For completeness, here is the csv to try out above code:
// csv file:
// colA,colB,colC
// 1,"foo","bar"
val inputDf = spark.read.format("csv").option("header", "true").load(csvFilePath)
// resulting DataFrame schema
root
|-- colA: string (nullable = true)
|-- colB: string (nullable = true)
|-- colC: string (nullable = true)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With