Trying to test Spark Structured Streams ...and failing... how can I test them properly?
I followed the general Spark testing question from here, and my closest try was [1] looking something like:
import simpleSparkTest.SparkSessionTestWrapper
import org.scalatest.FunSpec
import org.apache.spark.sql.types.{StringType, IntegerType, DoubleType, StructType, DateType}
import org.apache.spark.sql.streaming.OutputMode
class StructuredStreamingSpec extends FunSpec with SparkSessionTestWrapper {
describe("Structured Streaming") {
it("Read file from system") {
val schema = new StructType()
.add("station_id", IntegerType)
.add("name", StringType)
.add("lat", DoubleType)
.add("long", DoubleType)
.add("dockcount", IntegerType)
.add("landmark", StringType)
.add("installation", DateType)
val sourceDF = spark.readStream
.option("header", "true")
.schema(schema)
.csv("/Spark-The-Definitive-Guide/data/bike-data/201508_station_data.csv")
.coalesce(1)
val countSource = sourceDF.count()
val query = sourceDF.writeStream
.format("memory")
.queryName("Output")
.outputMode(OutputMode.Append())
.start()
.processAllAvailable()
assert(countSource === 70)
}
}
}
Sadly it always fails with org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start()
I also found this issue at the spark-testing-base repo and wonder if it is even possible to test Spark Structured Streaming?
I want to have integration test and maybe even use Kafka on top for testing Checkpointing or specific corrupt data scenarios. Can someone help me out?
Last but not least, I figured the version maybe also a constraint - I currently develop against 2.1.0 which I need because of Azure HDInsight deployment options. Self hosted is an option if this is the drag.
Spark structured streaming allows for near-time computations of streaming data over Spark SQL engine to generate aggregates or output as per the defined logic. This streaming data can be read from a file, a socket, or sources such as Kafka.
This leads to a new stream processing model that is very similar to a batch processing model. You will express your streaming computation as standard batch-like query as on a static table, and Spark runs it as an incrementalquery on the unboundedinput table. Let’s understand this model in more detail. Basic Concepts
Similar to static Datasets/DataFrames, you can use the common entry point SparkSession ( Scala / Java / Python / R docs) to create streaming DataFrames/Datasets from streaming sources, and apply the same operations on them as static DataFrames/Datasets.
API using Datasets and DataFrames Since Spark 2.0, DataFrames and Datasets can represent static, bounded data, as well as streaming, unbounded data.
Did you solve this?
You are doing a count() on a streaming dataframe before starting the execution by calling start(). If you want a count, how about doing this?
sourceDF.writeStream
.format("memory")
.queryName("Output")
.outputMode(OutputMode.Append())
.start()
.processAllAvailable()
val results: List[Row] = spark.sql("select * from Output").collectAsList()
assert(results.size() === 70)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With