I'm manually creating a dataframe for some testing. The code to create it is:
case class input(id:Long, var1:Int, var2:Int, var3:Double) val inputDF = sqlCtx .createDataFrame(List(input(1110,0,1001,-10.00), input(1111,1,1001,10.00), input(1111,0,1002,10.00)))
So the schema looks like this:
root |-- id: long (nullable = false) |-- var1: integer (nullable = false) |-- var2: integer (nullable = false) |-- var3: double (nullable = false)
I want to make 'nullable = true' for each one of these variable. How do I declare that from the start or switch it in a new dataframe after it's been created?
The replacement of null values in PySpark DataFrames is one of the most common operations undertaken. This can be achieved by using either DataFrame. fillna() or DataFrameNaFunctions. fill() methods.
To change the Spark SQL DataFrame column type from one data type to another data type you should use cast() function of Column class, you can use this on withColumn(), select(), selectExpr(), and SQL expression.
You can do update a PySpark DataFrame Column using withColum(), select() and sql(), since DataFrame's are distributed immutable collection you can't really change the column values however when you change the value using withColumn() or any approach, PySpark returns a new Dataframe with updated values.
With the imports
import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext}
you can use
/** * Set nullable property of column. * @param df source DataFrame * @param cn is the column name to change * @param nullable is the flag to set, such that the column is either nullable or not */ def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = { // get schema val schema = df.schema // modify [[StructField] with name `cn` val newSchema = StructType(schema.map { case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m) case y: StructField => y }) // apply new schema df.sqlContext.createDataFrame( df.rdd, newSchema ) }
directly.
Also you can make the method available via the "pimp my library" library pattern ( see my SO post What is the best way to define custom methods on a DataFrame? ), such that you can call
val df = .... val df2 = df.setNullableStateOfColumn( "id", true )
Use a slight modified version of setNullableStateOfColumn
def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = { // get schema val schema = df.schema // modify [[StructField] with name `cn` val newSchema = StructType(schema.map { case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m) }) // apply new schema df.sqlContext.createDataFrame( df.rdd, newSchema ) }
Explicitely define the schema. (Use reflection to create a solution that is more general)
configuredUnitTest("Stackoverflow.") { sparkContext => case class Input(id:Long, var1:Int, var2:Int, var3:Double) val sqlContext = new SQLContext(sparkContext) import sqlContext.implicits._ // use this to set the schema explicitly or // use refelection on the case class member to construct the schema val schema = StructType( Seq ( StructField( "id", LongType, true), StructField( "var1", IntegerType, true), StructField( "var2", IntegerType, true), StructField( "var3", DoubleType, true) )) val is: List[Input] = List( Input(1110, 0, 1001,-10.00), Input(1111, 1, 1001, 10.00), Input(1111, 0, 1002, 10.00) ) val rdd: RDD[Input] = sparkContext.parallelize( is ) val rowRDD: RDD[Row] = rdd.map( (i: Input) ⇒ Row(i.id, i.var1, i.var2, i.var3)) val inputDF = sqlContext.createDataFrame( rowRDD, schema ) inputDF.printSchema inputDF.show() }
Another option, if you need to change dataframe in-place, and recreating is impossible, you can do something like this:
.withColumn("col_name", when(col("col_name").isNotNull, col("col_name")).otherwise(lit(null)))
Spark will then think that this column may contain null
, and nullability will be set to true
. Also, you can use udf
, to wrap your values in Option
. Works fine even for streaming cases.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With