Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I detect if a Spark DataFrame has a column

When I create a DataFrame from a JSON file in Spark SQL, how can I tell if a given column exists before calling .select

Example JSON schema:

{   "a": {     "b": 1,     "c": 2   } } 

This is what I want to do:

potential_columns = Seq("b", "c", "d") df = sqlContext.read.json(filename) potential_columns.map(column => if(df.hasColumn(column)) df.select(s"a.$column")) 

but I can't find a good function for hasColumn. The closest I've gotten is to test if the column is in this somewhat awkward array:

scala> df.select("a.*").columns res17: Array[String] = Array(b, c) 
like image 781
ben Avatar asked Mar 09 '16 22:03

ben


People also ask

How do you check columns in PySpark?

You can find all column names & data types (DataType) of PySpark DataFrame by using df. dtypes and df. schema and you can also retrieve the data type of a specific column name using df. schema["name"].

How do I get a list of columns in Spark DataFrame?

You can get the all columns of a Spark DataFrame by using df. columns , it returns an array of column names as Array[Stirng] .

How do I show specific columns in Spark?

You can select the single or multiple columns of the Spark DataFrame by passing the column names you wanted to select to the select() function. Since DataFrame is immutable, this creates a new DataFrame with a selected columns. show() function is used to show the DataFrame contents.

How do I check my Spark data frame?

You can visualize a Spark dataframe in Jupyter notebooks by using the display(<dataframe-name>) function. The display() function is supported only on PySpark kernels. The Qviz framework supports 1000 rows and 100 columns. By default, the dataframe is visualized as a table.


2 Answers

Just assume it exists and let it fail with Try. Plain and simple and supports an arbitrary nesting:

import scala.util.Try import org.apache.spark.sql.DataFrame  def hasColumn(df: DataFrame, path: String) = Try(df(path)).isSuccess  val df = sqlContext.read.json(sc.parallelize(   """{"foo": [{"bar": {"foobar": 3}}]}""" :: Nil))  hasColumn(df, "foobar") // Boolean = false  hasColumn(df, "foo") // Boolean = true  hasColumn(df, "foo.bar") // Boolean = true  hasColumn(df, "foo.bar.foobar") // Boolean = true  hasColumn(df, "foo.bar.foobaz") // Boolean = false 

Or even simpler:

val columns = Seq(   "foobar", "foo", "foo.bar", "foo.bar.foobar", "foo.bar.foobaz")  columns.flatMap(c => Try(df(c)).toOption) // Seq[org.apache.spark.sql.Column] = List( //   foo, foo.bar AS bar#12, foo.bar.foobar AS foobar#13) 

Python equivalent:

from pyspark.sql.utils import AnalysisException from pyspark.sql import Row   def has_column(df, col):     try:         df[col]         return True     except AnalysisException:         return False  df = sc.parallelize([Row(foo=[Row(bar=Row(foobar=3))])]).toDF()  has_column(df, "foobar") ## False  has_column(df, "foo") ## True  has_column(df, "foo.bar") ## True  has_column(df, "foo.bar.foobar") ## True  has_column(df, "foo.bar.foobaz") ## False 
like image 109
zero323 Avatar answered Oct 14 '22 14:10

zero323


Another option which I normally use is

df.columns.contains("column-name-to-check") 

This returns a boolean

like image 23
Jai Prakash Avatar answered Oct 14 '22 14:10

Jai Prakash