Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Create DataFrame with null value for few column

I am trying to create a DataFrame using RDD.

First I am creating a RDD using below code -

val account = sc.parallelize(Seq(
                                 (1, null, 2,"F"), 
                                 (2, 2, 4, "F"),
                                 (3, 3, 6, "N"),
                                 (4,null,8,"F")))

It is working fine -

account: org.apache.spark.rdd.RDD[(Int, Any, Int, String)] = ParallelCollectionRDD[0] at parallelize at :27

but when try to create DataFrame from the RDD using below code

account.toDF("ACCT_ID", "M_CD", "C_CD","IND")

I am getting below error

java.lang.UnsupportedOperationException: Schema for type Any is not supported

I analyzed that whenever I put null value in Seq then only I got the error.

Is there any way to add null value?

like image 853
Avijit Avatar asked Sep 13 '16 07:09

Avijit


2 Answers

Alternative way without using RDDs:

import spark.implicits._

val df = spark.createDataFrame(Seq(
  (1, None,    2, "F"),
  (2, Some(2), 4, "F"),
  (3, Some(3), 6, "N"),
  (4, None,    8, "F")
)).toDF("ACCT_ID", "M_CD", "C_CD","IND")

df.show
+-------+----+----+---+
|ACCT_ID|M_CD|C_CD|IND|
+-------+----+----+---+
|      1|null|   2|  F|
|      2|   2|   4|  F|
|      3|   3|   6|  N|
|      4|null|   8|  F|
+-------+----+----+---+

df.printSchema
root
 |-- ACCT_ID: integer (nullable = false)
 |-- M_CD: integer (nullable = true)
 |-- C_CD: integer (nullable = false)
 |-- IND: string (nullable = true)
like image 183
Marsellus Wallace Avatar answered Oct 20 '22 22:10

Marsellus Wallace


The problem is that Any is too general type and Spark just has no idea how to serialize it. You should explicitly provide some specific type, in your case Integer. Since null can't be assigned to primitive types in Scala you can use java.lang.Integer instead. So try this:

val account = sc.parallelize(Seq(
                                 (1, null.asInstanceOf[Integer], 2,"F"), 
                                 (2, new Integer(2), 4, "F"),
                                 (3, new Integer(3), 6, "N"),
                                 (4, null.asInstanceOf[Integer],8,"F")))

Here is an output:

rdd: org.apache.spark.rdd.RDD[(Int, Integer, Int, String)] = ParallelCollectionRDD[0] at parallelize at <console>:24

And the corresponding DataFrame:

scala> val df = rdd.toDF("ACCT_ID", "M_CD", "C_CD","IND")

df: org.apache.spark.sql.DataFrame = [ACCT_ID: int, M_CD: int ... 2 more fields]

scala> df.show
+-------+----+----+---+
|ACCT_ID|M_CD|C_CD|IND|
+-------+----+----+---+
|      1|null|   2|  F|
|      2|   2|   4|  F|
|      3|   3|   6|  N|
|      4|null|   8|  F|
+-------+----+----+---+

Also you can consider some cleaner way to declare the null integer value like:

object Constants {
  val NullInteger: java.lang.Integer = null
}
like image 28
Zyoma Avatar answered Oct 20 '22 22:10

Zyoma