Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to add a column to Dataset without converting from a DataFrame and accessing it?

I am aware of method to add a new column to a Spark DataSet using .withColumn() and a UDF, which returns a DataFrame. I am also aware that, we can convert the resulting DataFrame to a DataSet.

My questions are:

  1. How does DataSet's type safety comes into play here, if we are still following traditional DF approach (i.e passing column names as a string for UDF's input)
  2. Is there an "Object Oriented Way" of accessing columns(without passing column names as a string) like we used to do with RDD, for appending a new column.
  3. How to access the new column in normal operations like map, filter etc?

For example:

    scala> case class Temp(a : Int, b : String)    //creating case class
    scala> val df = Seq((1,"1str"),(2,"2str),(3,"3str")).toDS    // creating DS
    scala> val appendUDF = udf( (b : String) => b + "ing")      // sample UDF

    scala> df.withColumn("c",df("b"))   // adding a new column
    res5: org.apache.spark.sql.DataFrame = [a: int, b: string ... 1 more field]

    scala> res5.as[Temp]   // converting to DS
    res6: org.apache.spark.sql.Dataset[Temp] = [a: int, b: string ... 1 more field]

    scala> res6.map( x =>x.  
    // list of autosuggestion :
    a   canEqual   equals     productArity     productIterator   toString   
    b   copy       hashCode   productElement   productPrefix 

the new column c, that i have added using .withColumn() is not accessible, Because column c is not in the case class Temp (it contains only a & b) at the instant when it is converted to DS using res5.as[Temp].

How to access column c?

like image 358
vdep Avatar asked Nov 15 '16 11:11

vdep


People also ask

How do I add a column to an existing DataFrame in Pyspark?

In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .

How do I add a column in spark dataset?

A new column could be added to an existing Dataset using Dataset. withColumn() method. withColumn accepts two arguments: the column name to be added, and the Column and returns a new Dataset<Row>.

What is withColumn in Pyspark?

In PySpark, the withColumn() function is widely used and defined as the transformation function of the DataFrame which is further used to change the value, convert the datatype of an existing column, create the new column etc.


1 Answers

In the type-safe world of Datasets you'd map an structure into another.

That is, for each transformation, we need schema representations of the data (as it is needed for RDDs). To access 'c' above, we need to create a new schema that provides access to it.

case class A(a:String)
case class BC(b:String, c:String)
val f:A => BC = a=> BC(a.a,"c") // Transforms an A into a BC

val data = (1 to 10).map(i => A(i.toString))
val dsa = spark.createDataset(data)
// dsa: org.apache.spark.sql.Dataset[A] = [a: string]

val dsb = dsa.map(f)
//dsb: org.apache.spark.sql.Dataset[BC] = [b: string, c: string]
like image 51
maasg Avatar answered Oct 09 '22 05:10

maasg