Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark SQL's where clause excludes null values

I am trying to run queries on Apache spark sql. The first query works fine, but the second query removes null values also.

code :

def main(args: Array[String]) {

    val sc = new SparkContext("local[*]", "Spark")
    val sqlContext = new SQLContext(sc)

    val pageViewsDF = getDataframe(sc, sqlContext)

    println("RUNNING SQL QUERIES ")

    sqlContext.sql("select name , count(*) from pageviews_by_second group by name").show(10)

    sqlContext.sql("select name , count(*) from pageviews_by_second where name not in (\"Rose\") group by name").show(10)

  }

  def getDataframe(sc: SparkContext, sqlContext: SQLContext): DataFrame = {

    Logger.getLogger("org").setLevel(Level.OFF);
    Logger.getLogger("akka").setLevel(Level.OFF);

    val dataArray = List(List("David", null),
      List("David", null),
      List("Charlie", "23"),
      List("Rose", null),
      List("Ben", null),
      List("Harry", "43"),
      List(null, "25"),
      List(null, "21"),
      List("David", "15"),
      List("Rose", null),
      List("Alan", "26"))
    val separator = ","

    // Create an RDD
    val dataRDD = sc.parallelize(dataArray)

    // The schema is encoded in a string
    val header = "name,age"

    // Import Spark SQL data types and Row.
    import org.apache.spark.sql._

    // Generate the schema based on the string of schema
    val schema =
      StructType(
        header.split(separator).map { fieldName =>
          StructField(fieldName, StringType, true)
        })

    val rowRDD =
      dataRDD
        .map(p => Row(p(0), p(1)))

    // Apply the schema to the RDD.
    var df = sqlContext.createDataFrame(rowRDD, schema)

    df.registerTempTable("pageviews_by_second")

    df
  }

The result of first query is :

+-------+---+
|   name|_c1|
+-------+---+
|   Alan|  1|
|    Ben|  1|
|  David|  3|
|Charlie|  1|
|   Rose|  2|
|  Harry|  1|
|   null|  2|
+-------+---+

And the out put of second query :

+-------+---+
|   name|_c1|
+-------+---+
|   Alan|  1|
|    Ben|  1|
|  David|  3|
|Charlie|  1|
|  Harry|  1|
+-------+---+

In the second query I am excluding "Rose" only but "null" is also getting excluded .

If my query is wrong please help me with the correct query.

like image 336
bob Avatar asked Feb 08 '23 09:02

bob


2 Answers

It happens because NULL in SQL is equivalent to "unknown". It means that any comparison with NULL, other than IS NULL / IS NOT NULL is undefined and returns NULL.

case class Record(id: Integer, value: String)

val df = sc.parallelize(Seq(Record(1, "foo"), Record(2, null))).toDF
df.registerTempTable("df")

sqlContext.sql("""SELECT value = "foo" FROM df""").show
// +----+
// | _c0|
// +----+
// |true|
// |null|
// +----+

sqlContext.sql("""SELECT value != "foo" FROM df""").show
// +-----+
// |  _c0|
// +-----+
// |false|
// | null|
// +-----+

Because of that IN / NOT IN is undefined as well:

sqlContext.sql("""SELECT value IN ("foo", "bar")  FROM df""").show
// +----+
// | _c0|
// +----+
// |true|
// |null|
// +----+

This is a standard SQL behavior and system that correctly implements SQL standard should behave the same way. If you to filter and keep NULLs you'll have to make it explicitly:

sqlContext.sql(
  """SELECT value IN ("foo", "bar") OR value IS NULL FROM df""").show
// +----+
// | _c0|
// +----+
// |true|
// |true|
// +----+
like image 173
zero323 Avatar answered Feb 10 '23 22:02

zero323


If you want to switch back in the two state logic you need to coalesce the nullable column (this is the standard term for proprietary NVL) to some non existing value.

Using same setup as @zero323

 sqlContext.sql("""SELECT value, coalesce(value,'other baz') = "foo" FROM df""").show

.

value c1
foo   true
null  false
like image 33
Marmite Bomber Avatar answered Feb 10 '23 23:02

Marmite Bomber