Spark 2.1, structured streaming with primitive count(*), sum(field) is working ok on top of parquet files, but filtering is not working. Sample code:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0.2.6.0.3-8
/_/
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.sql.types._
val userSchema = new StructType()
.add("caseId", StringType)
.add("ts", LongType)
.add("rowtype", StringType)
.add("rowordernumber", IntegerType)
.add("parentrowordernumber", IntegerType)
.add("fieldname", StringType)
.add("valuestr", StringType)
val csvDF = spark.readStream.schema(userSchema).parquet("/folder1/folder2")
csvDF.createOrReplaceTempView("tmptable")
val aggDF = spark.sql("select count(*) from tmptable where rowtype='3600'")
aggDF
.writeStream
.outputMode("complete")
.format("console")
.start()
aggDF
.writeStream
.queryName("aggregates") // this query name will be the table name
.outputMode("complete")
.format("memory")
.start()
spark.sql("select * from aggregates").show()
// Exiting paste mode, now interpreting.
+--------+
|count(1)|
+--------+
+--------+
import org.apache.spark.sql.types._
userSchema: org.apache.spark.sql.types.StructType = StructType(StructField(caseId,StringType,true), StructField(ts,LongType,true), StructField(rowtype,StringType,true), StructField(rowordernumber,IntegerType,true), StructField(parentrowordernumber,IntegerType,true), StructField(fieldname,StringType,true), StructField(valuestr,StringType,true))
csvDF: org.apache.spark.sql.DataFrame = [caseId: string, ts: bigint ... 5 more fields]
aggDF: org.apache.spark.sql.DataFrame = [count(1): bigint]
-------------------------------------------
Batch: 0
-------------------------------------------
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
+--------+
|count(1)|
+--------+
| 0|
+--------+
Also I've tried noSQL style filtering: val aggDF = csvDF.filter("rowtype == '3600'").agg(count("caseId"))
no success, and I've checked the parquet files, there are some rows where rowtype='3600'
[root@sandbox ~]# spark-sql
SPARK_MAJOR_VERSION is set to 2, using Spark2
spark-sql> select count(*) from tab1 where rowtype='3600' ;
433698463
You are not required to specify your own schema when your data is static. In this case Spark can figure out the schema of your parquet dataset on its own. E.g.:
case class Foo(lowercase: String, upperCase: String)
val df = spark.createDataset(List(Foo("abc","DEF"), Foo("ghi","JKL")))
df.write.parquet("/tmp/parquet-test")
val rdf = spark.read.parquet("/tmp/parquet-test")
rdf.printSchema
// root
// |-- lowercase: string (nullable = true)
// |-- upperCase: string (nullable = true)
At this stage, subsequent SQL queries would work disregards for the case:
rdf.createOrReplaceTempView("rdf")
spark.sql("select uppercase from rdf").collect
// Array[org.apache.spark.sql.Row] = Array([DEF], [JKL])
Spark has an option spark.sql.caseSensitive
to enable/disable case sensitivity (default value is true
) but it seems it only works on write.
An attempt to do the same with the stream will lead to an exception:
java.lang.IllegalArgumentException: Schema must be specified when creating a streaming
source DataFrame. If some files already exist in the directory, then depending
on the file format you may be able to create a static DataFrame on that directory
with 'spark.read.load(directory)' and infer schema from it.
This leaves you with following options:
val userSchema = spark.read.parquet("/tmp/parquet-test").schema
val streamDf = spark.readStream.schema(userSchema).parquet("/tmp/parquet-test")
spark.sql.streaming.schemaInference
to true
:spark.sql("set spark.sql.streaming.schemaInference=true")
val streamDf = spark.readStream.parquet("/tmp/parquet-test")
streamDf.createOrReplaceTempView("stream_rdf")
val query = spark.sql("select uppercase, count(*) from rdf group by uppercase")
.writeStream
.format("console")
.outputMode("complete")
.start
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With