Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dropping a nested column from Spark DataFrame

I have a DataFrame with the schema

root  |-- label: string (nullable = true)  |-- features: struct (nullable = true)  |    |-- feat1: string (nullable = true)  |    |-- feat2: string (nullable = true)  |    |-- feat3: string (nullable = true) 

While, I am able to filter the data frame using

  val data = rawData      .filter( !(rawData("features.feat1") <=> "100") ) 

I am unable to drop the columns using

  val data = rawData        .drop("features.feat1") 

Is it something that I am doing wrong here? I also tried (unsuccessfully) doing drop(rawData("features.feat1")), though it does not make much sense to do so.

Thanks in advance,

Nikhil

like image 859
Nikhil J Joshi Avatar asked Sep 22 '15 21:09

Nikhil J Joshi


People also ask

How do I drop a nested column in spark DataFrame?

You can always get all columns with the DataFrame's . columns() method, remove unwanted column from the sequence and do select(myColumns:_*) .

How do I drop a column in spark table?

The Spark DataFrame provides the drop() method to drop the column or the field from the DataFrame or the Dataset. The drop() method is also used to remove the multiple columns from the Spark DataFrame or the Database. The Dataset is the distributed collection of the data.

What is flatten in spark?

In the Spark SQL, flatten function is a built-in function that is defined as a function to convert an Array of the Array column (nested array) that is ArrayanyType(ArrayanyType(StringType)) into the single array column on the Spark DataFrame. The Spark SQL is defined as the Spark module for structured data processing.


2 Answers

It is just a programming exercise but you can try something like this:

import org.apache.spark.sql.{DataFrame, Column} import org.apache.spark.sql.types.{StructType, StructField} import org.apache.spark.sql.{functions => f} import scala.util.Try  case class DFWithDropFrom(df: DataFrame) {   def getSourceField(source: String): Try[StructField] = {     Try(df.schema.fields.filter(_.name == source).head)   }    def getType(sourceField: StructField): Try[StructType] = {     Try(sourceField.dataType.asInstanceOf[StructType])   }    def genOutputCol(names: Array[String], source: String): Column = {     f.struct(names.map(x => f.col(source).getItem(x).alias(x)): _*)   }    def dropFrom(source: String, toDrop: Array[String]): DataFrame = {     getSourceField(source)       .flatMap(getType)       .map(_.fieldNames.diff(toDrop))       .map(genOutputCol(_, source))       .map(df.withColumn(source, _))       .getOrElse(df)   } } 

Example usage:

scala> case class features(feat1: String, feat2: String, feat3: String) defined class features  scala> case class record(label: String, features: features) defined class record  scala> val df = sc.parallelize(Seq(record("a_label",  features("f1", "f2", "f3")))).toDF df: org.apache.spark.sql.DataFrame = [label: string, features: struct<feat1:string,feat2:string,feat3:string>]  scala> DFWithDropFrom(df).dropFrom("features", Array("feat1")).show +-------+--------+ |  label|features| +-------+--------+ |a_label| [f2,f3]| +-------+--------+   scala> DFWithDropFrom(df).dropFrom("foobar", Array("feat1")).show +-------+----------+ |  label|  features| +-------+----------+ |a_label|[f1,f2,f3]| +-------+----------+   scala> DFWithDropFrom(df).dropFrom("features", Array("foobar")).show +-------+----------+ |  label|  features| +-------+----------+ |a_label|[f1,f2,f3]| +-------+----------+ 

Add an implicit conversion and you're good to go.

like image 85
zero323 Avatar answered Sep 29 '22 20:09

zero323


This version allows you to remove nested columns at any level:

import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{StructType, DataType}  /**   * Various Spark utilities and extensions of DataFrame   */ object DataFrameUtils {    private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {     if (fullColName.equals(dropColName)) {       None     } else {       colType match {         case colType: StructType =>           if (dropColName.startsWith(s"${fullColName}.")) {             Some(struct(               colType.fields                 .flatMap(f =>                   dropSubColumn(col.getField(f.name), f.dataType, s"${fullColName}.${f.name}", dropColName) match {                     case Some(x) => Some(x.alias(f.name))                     case None => None                   })                 : _*))           } else {             Some(col)           }         case other => Some(col)       }     }   }    protected def dropColumn(df: DataFrame, colName: String): DataFrame = {     df.schema.fields       .flatMap(f => {         if (colName.startsWith(s"${f.name}.")) {           dropSubColumn(col(f.name), f.dataType, f.name, colName) match {             case Some(x) => Some((f.name, x))             case None => None           }         } else {           None         }       })       .foldLeft(df.drop(colName)) {         case (df, (colName, column)) => df.withColumn(colName, column)       }   }    /**     * Extended version of DataFrame that allows to operate on nested fields     */   implicit class ExtendedDataFrame(df: DataFrame) extends Serializable {     /**       * Drops nested field from DataFrame       *       * @param colName Dot-separated nested field name       */     def dropNestedColumn(colName: String): DataFrame = {       DataFrameUtils.dropColumn(df, colName)     }   } } 

Usage:

import DataFrameUtils._ df.dropNestedColumn("a.b.c.d") 
like image 32
Michael Spector Avatar answered Sep 29 '22 19:09

Michael Spector