Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Change nullable property of column in spark dataframe

I'm manually creating a dataframe for some testing. The code to create it is:

case class input(id:Long, var1:Int, var2:Int, var3:Double) val inputDF = sqlCtx   .createDataFrame(List(input(1110,0,1001,-10.00),     input(1111,1,1001,10.00),     input(1111,0,1002,10.00))) 

So the schema looks like this:

root  |-- id: long (nullable = false)  |-- var1: integer (nullable = false)  |-- var2: integer (nullable = false)  |-- var3: double (nullable = false) 

I want to make 'nullable = true' for each one of these variable. How do I declare that from the start or switch it in a new dataframe after it's been created?

like image 209
J Calbreath Avatar asked Oct 18 '15 03:10

J Calbreath


People also ask

How do I change the NULL value in Spark DataFrame?

The replacement of null values in PySpark DataFrames is one of the most common operations undertaken. This can be achieved by using either DataFrame. fillna() or DataFrameNaFunctions. fill() methods.

How do I change the datatype of a column in a Spark?

To change the Spark SQL DataFrame column type from one data type to another data type you should use cast() function of Column class, you can use this on withColumn(), select(), selectExpr(), and SQL expression.

How do I change a column value in Spark DataFrame?

You can do update a PySpark DataFrame Column using withColum(), select() and sql(), since DataFrame's are distributed immutable collection you can't really change the column values however when you change the value using withColumn() or any approach, PySpark returns a new Dataframe with updated values.


2 Answers

Answer

With the imports

import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext} 

you can use

/**  * Set nullable property of column.  * @param df source DataFrame  * @param cn is the column name to change  * @param nullable is the flag to set, such that the column is  either nullable or not  */ def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = {    // get schema   val schema = df.schema   // modify [[StructField] with name `cn`   val newSchema = StructType(schema.map {     case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m)     case y: StructField => y   })   // apply new schema   df.sqlContext.createDataFrame( df.rdd, newSchema ) } 

directly.

Also you can make the method available via the "pimp my library" library pattern ( see my SO post What is the best way to define custom methods on a DataFrame? ), such that you can call

val df = .... val df2 = df.setNullableStateOfColumn( "id", true ) 

Edit

Alternative solution 1

Use a slight modified version of setNullableStateOfColumn

def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = {   // get schema   val schema = df.schema   // modify [[StructField] with name `cn`   val newSchema = StructType(schema.map {     case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m)   })   // apply new schema   df.sqlContext.createDataFrame( df.rdd, newSchema ) } 

Alternative solution 2

Explicitely define the schema. (Use reflection to create a solution that is more general)

configuredUnitTest("Stackoverflow.") { sparkContext =>    case class Input(id:Long, var1:Int, var2:Int, var3:Double)    val sqlContext = new SQLContext(sparkContext)   import sqlContext.implicits._     // use this to set the schema explicitly or   // use refelection on the case class member to construct the schema   val schema = StructType( Seq (     StructField( "id", LongType, true),     StructField( "var1", IntegerType, true),     StructField( "var2", IntegerType, true),     StructField( "var3", DoubleType, true)   ))    val is: List[Input] = List(     Input(1110, 0, 1001,-10.00),     Input(1111, 1, 1001, 10.00),     Input(1111, 0, 1002, 10.00)   )    val rdd: RDD[Input] =  sparkContext.parallelize( is )   val rowRDD: RDD[Row] = rdd.map( (i: Input) ⇒ Row(i.id, i.var1, i.var2, i.var3))   val inputDF = sqlContext.createDataFrame( rowRDD, schema )     inputDF.printSchema   inputDF.show() } 
like image 156
Martin Senne Avatar answered Sep 28 '22 12:09

Martin Senne


Another option, if you need to change dataframe in-place, and recreating is impossible, you can do something like this:

.withColumn("col_name", when(col("col_name").isNotNull, col("col_name")).otherwise(lit(null))) 

Spark will then think that this column may contain null, and nullability will be set to true. Also, you can use udf, to wrap your values in Option. Works fine even for streaming cases.

like image 38
Rayan Ral Avatar answered Sep 28 '22 12:09

Rayan Ral