I have a DataFrame
with the schema
root |-- label: string (nullable = true) |-- features: struct (nullable = true) | |-- feat1: string (nullable = true) | |-- feat2: string (nullable = true) | |-- feat3: string (nullable = true)
While, I am able to filter the data frame using
val data = rawData .filter( !(rawData("features.feat1") <=> "100") )
I am unable to drop the columns using
val data = rawData .drop("features.feat1")
Is it something that I am doing wrong here? I also tried (unsuccessfully) doing drop(rawData("features.feat1"))
, though it does not make much sense to do so.
Thanks in advance,
Nikhil
You can always get all columns with the DataFrame's . columns() method, remove unwanted column from the sequence and do select(myColumns:_*) .
The Spark DataFrame provides the drop() method to drop the column or the field from the DataFrame or the Dataset. The drop() method is also used to remove the multiple columns from the Spark DataFrame or the Database. The Dataset is the distributed collection of the data.
In the Spark SQL, flatten function is a built-in function that is defined as a function to convert an Array of the Array column (nested array) that is ArrayanyType(ArrayanyType(StringType)) into the single array column on the Spark DataFrame. The Spark SQL is defined as the Spark module for structured data processing.
It is just a programming exercise but you can try something like this:
import org.apache.spark.sql.{DataFrame, Column} import org.apache.spark.sql.types.{StructType, StructField} import org.apache.spark.sql.{functions => f} import scala.util.Try case class DFWithDropFrom(df: DataFrame) { def getSourceField(source: String): Try[StructField] = { Try(df.schema.fields.filter(_.name == source).head) } def getType(sourceField: StructField): Try[StructType] = { Try(sourceField.dataType.asInstanceOf[StructType]) } def genOutputCol(names: Array[String], source: String): Column = { f.struct(names.map(x => f.col(source).getItem(x).alias(x)): _*) } def dropFrom(source: String, toDrop: Array[String]): DataFrame = { getSourceField(source) .flatMap(getType) .map(_.fieldNames.diff(toDrop)) .map(genOutputCol(_, source)) .map(df.withColumn(source, _)) .getOrElse(df) } }
Example usage:
scala> case class features(feat1: String, feat2: String, feat3: String) defined class features scala> case class record(label: String, features: features) defined class record scala> val df = sc.parallelize(Seq(record("a_label", features("f1", "f2", "f3")))).toDF df: org.apache.spark.sql.DataFrame = [label: string, features: struct<feat1:string,feat2:string,feat3:string>] scala> DFWithDropFrom(df).dropFrom("features", Array("feat1")).show +-------+--------+ | label|features| +-------+--------+ |a_label| [f2,f3]| +-------+--------+ scala> DFWithDropFrom(df).dropFrom("foobar", Array("feat1")).show +-------+----------+ | label| features| +-------+----------+ |a_label|[f1,f2,f3]| +-------+----------+ scala> DFWithDropFrom(df).dropFrom("features", Array("foobar")).show +-------+----------+ | label| features| +-------+----------+ |a_label|[f1,f2,f3]| +-------+----------+
Add an implicit conversion and you're good to go.
This version allows you to remove nested columns at any level:
import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{StructType, DataType} /** * Various Spark utilities and extensions of DataFrame */ object DataFrameUtils { private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = { if (fullColName.equals(dropColName)) { None } else { colType match { case colType: StructType => if (dropColName.startsWith(s"${fullColName}.")) { Some(struct( colType.fields .flatMap(f => dropSubColumn(col.getField(f.name), f.dataType, s"${fullColName}.${f.name}", dropColName) match { case Some(x) => Some(x.alias(f.name)) case None => None }) : _*)) } else { Some(col) } case other => Some(col) } } } protected def dropColumn(df: DataFrame, colName: String): DataFrame = { df.schema.fields .flatMap(f => { if (colName.startsWith(s"${f.name}.")) { dropSubColumn(col(f.name), f.dataType, f.name, colName) match { case Some(x) => Some((f.name, x)) case None => None } } else { None } }) .foldLeft(df.drop(colName)) { case (df, (colName, column)) => df.withColumn(colName, column) } } /** * Extended version of DataFrame that allows to operate on nested fields */ implicit class ExtendedDataFrame(df: DataFrame) extends Serializable { /** * Drops nested field from DataFrame * * @param colName Dot-separated nested field name */ def dropNestedColumn(colName: String): DataFrame = { DataFrameUtils.dropColumn(df, colName) } } }
Usage:
import DataFrameUtils._ df.dropNestedColumn("a.b.c.d")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With