Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Adding StringType column to existing Spark DataFrame and then applying default values

Scala 2.10 here using Spark 1.6.2. I have a similar (but not the same) question as this one, however, the accepted answer is not an SSCCE and assumes a certain amount of "upfront knowledge" about Spark; and therefore I can't reproduce it or make sense of it. More importantly, that question is also just limited to adding a new column to an existing dataframe, whereas I need to add a column as well as a value for all existing rows in the dataframe.


So I want to add a column to an existing Spark DataFrame, and then apply an initial ('default') value for that new column to all rows.

val json : String = """{ "x": true, "y": "not true" }"""
val rdd = sparkContext.parallelize(Seq(json))
val jsonDF = sqlContext.read.json(rdd)

jsonDF.show()

When I run that I get this following as output (via .show()):

+----+--------+
|   x|       y|
+----+--------+
|true|not true|
+----+--------+

Now I want to add a new field to jsonDF, after it's created and without modifying the json string, such that the resultant DF would look like this:

+----+--------+----+
|   x|       y|   z|
+----+--------+----+
|true|not true| red|
+----+--------+----+

Meaning, I want to add a new "z" column to the DF, of type StringType, and then default all rows to contain a z-value of "red".

From that other question I have pieced the following pseudo-code together:

val json : String = """{ "x": true, "y": "not true" }"""
val rdd = sparkContext.parallelize(Seq(json))
val jsonDF = sqlContext.read.json(rdd)

//jsonDF.show()

val newDF = jsonDF.withColumn("z", jsonDF("col") + 1)

newDF.show()

But when I run this, I get a compiler error on that .withColumn(...) method:

org.apache.spark.sql.AnalysisException: Cannot resolve column name "col" among (x, y);
    at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152)
    at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:151)
    at org.apache.spark.sql.DataFrame.col(DataFrame.scala:664)
    at org.apache.spark.sql.DataFrame.apply(DataFrame.scala:652)

I also don't see any API methods that would allow me to set "red" as the default value. Any ideas as to where I'm going awry?

like image 846
smeeb Avatar asked Oct 10 '16 16:10

smeeb


People also ask

How do I add a column to an existing spark DataFrame?

In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .

How do you update column values in spark?

Update the column value Spark withColumn() function of the DataFrame is used to update the value of a column. withColumn() function takes 2 arguments; first the column you wanted to update and the second the value you wanted to update with.

Does withColumn replace existing column?

WithColumns is used to change the value, convert the datatype of an existing column, create a new column, and many more. Returns: A new :class:`DataFrame` by adding a column or replacing the existing column that has the same name.


1 Answers

You can use lit function. First you have to import it

import org.apache.spark.sql.functions.lit

and use it as shown below

jsonDF.withColumn("z", lit("red"))

Type of the column will be inferred automatically.

like image 167
zero323 Avatar answered Nov 14 '22 19:11

zero323