Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark structured streaming and filters

Spark 2.1, structured streaming with primitive count(*), sum(field) is working ok on top of parquet files, but filtering is not working. Sample code:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0.2.6.0.3-8
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :paste
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.types._

val userSchema = new StructType()
  .add("caseId", StringType)
  .add("ts", LongType)
  .add("rowtype", StringType)
  .add("rowordernumber", IntegerType)
  .add("parentrowordernumber", IntegerType)
  .add("fieldname", StringType)
  .add("valuestr", StringType)

val csvDF = spark.readStream.schema(userSchema).parquet("/folder1/folder2")

csvDF.createOrReplaceTempView("tmptable")
val aggDF = spark.sql("select count(*) from tmptable where rowtype='3600'")

aggDF
  .writeStream
  .outputMode("complete")
  .format("console")
  .start()

aggDF
.writeStream
.queryName("aggregates")    // this query name will be the table name
.outputMode("complete")
  .format("memory")
  .start()
spark.sql("select * from aggregates").show()


// Exiting paste mode, now interpreting.

+--------+
|count(1)|
+--------+
+--------+

import org.apache.spark.sql.types._
userSchema: org.apache.spark.sql.types.StructType = StructType(StructField(caseId,StringType,true), StructField(ts,LongType,true), StructField(rowtype,StringType,true), StructField(rowordernumber,IntegerType,true), StructField(parentrowordernumber,IntegerType,true), StructField(fieldname,StringType,true), StructField(valuestr,StringType,true))
csvDF: org.apache.spark.sql.DataFrame = [caseId: string, ts: bigint ... 5 more fields]
aggDF: org.apache.spark.sql.DataFrame = [count(1): bigint]

-------------------------------------------
Batch: 0
-------------------------------------------
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
+--------+
|count(1)|
+--------+
|       0|
+--------+

Also I've tried noSQL style filtering: val aggDF = csvDF.filter("rowtype == '3600'").agg(count("caseId"))

no success, and I've checked the parquet files, there are some rows where rowtype='3600'

[root@sandbox ~]# spark-sql
SPARK_MAJOR_VERSION is set to 2, using Spark2
spark-sql> select count(*) from tab1 where rowtype='3600' ;
433698463
like image 715
Triffids Avatar asked Jul 31 '17 08:07

Triffids


1 Answers

You are not required to specify your own schema when your data is static. In this case Spark can figure out the schema of your parquet dataset on its own. E.g.:

case class Foo(lowercase: String, upperCase: String)
val df = spark.createDataset(List(Foo("abc","DEF"), Foo("ghi","JKL")))
df.write.parquet("/tmp/parquet-test")
val rdf = spark.read.parquet("/tmp/parquet-test")
rdf.printSchema
// root
//  |-- lowercase: string (nullable = true)
//  |-- upperCase: string (nullable = true)

At this stage, subsequent SQL queries would work disregards for the case:

rdf.createOrReplaceTempView("rdf")
spark.sql("select uppercase from rdf").collect
// Array[org.apache.spark.sql.Row] = Array([DEF], [JKL])

Spark has an option spark.sql.caseSensitive to enable/disable case sensitivity (default value is true) but it seems it only works on write.

An attempt to do the same with the stream will lead to an exception:

java.lang.IllegalArgumentException: Schema must be specified when creating a streaming
  source DataFrame. If some files already exist in the directory, then depending
  on the file format you may be able to create a static DataFrame on that directory
  with 'spark.read.load(directory)' and infer schema from it.

This leaves you with following options:

  1. Provide your own schema as you did (be aware though that it is case sensitive).
  2. Follow the advice in the exception and derive the schema from the data already stored in the folder:
val userSchema = spark.read.parquet("/tmp/parquet-test").schema
val streamDf = spark.readStream.schema(userSchema).parquet("/tmp/parquet-test")
  1. Tell Spark to infer the schema anyway by setting spark.sql.streaming.schemaInference to true:
spark.sql("set spark.sql.streaming.schemaInference=true")
val streamDf = spark.readStream.parquet("/tmp/parquet-test")
streamDf.createOrReplaceTempView("stream_rdf")
val query = spark.sql("select uppercase, count(*) from rdf group by uppercase")
  .writeStream
  .format("console")
  .outputMode("complete")
  .start
like image 113
nonsleepr Avatar answered Sep 26 '22 00:09

nonsleepr