Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to write integration tests for Sparks new Structured Streaming?

Trying to test Spark Structured Streams ...and failing... how can I test them properly?

I followed the general Spark testing question from here, and my closest try was [1] looking something like:

import simpleSparkTest.SparkSessionTestWrapper
import org.scalatest.FunSpec  
import org.apache.spark.sql.types.{StringType, IntegerType, DoubleType, StructType, DateType}
import org.apache.spark.sql.streaming.OutputMode

class StructuredStreamingSpec extends FunSpec with SparkSessionTestWrapper {

  describe("Structured Streaming") {

    it("Read file from system") {

      val schema = new StructType()
        .add("station_id", IntegerType)
        .add("name", StringType)
        .add("lat", DoubleType)
        .add("long", DoubleType)
        .add("dockcount", IntegerType)
        .add("landmark", StringType)
        .add("installation", DateType)

      val sourceDF = spark.readStream
        .option("header", "true")
        .schema(schema)
        .csv("/Spark-The-Definitive-Guide/data/bike-data/201508_station_data.csv")
        .coalesce(1)

      val countSource = sourceDF.count()

      val query = sourceDF.writeStream
        .format("memory")
        .queryName("Output")
        .outputMode(OutputMode.Append())
        .start()
        .processAllAvailable()

      assert(countSource === 70)
    }

  }

}

Sadly it always fails with org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start()

I also found this issue at the spark-testing-base repo and wonder if it is even possible to test Spark Structured Streaming?

I want to have integration test and maybe even use Kafka on top for testing Checkpointing or specific corrupt data scenarios. Can someone help me out?

Last but not least, I figured the version maybe also a constraint - I currently develop against 2.1.0 which I need because of Azure HDInsight deployment options. Self hosted is an option if this is the drag.

like image 832
lony Avatar asked Mar 27 '18 19:03

lony


People also ask

What is structured streaming in Apache Spark?

Spark structured streaming allows for near-time computations of streaming data over Spark SQL engine to generate aggregates or output as per the defined logic. This streaming data can be read from a file, a socket, or sources such as Kafka.

What is the Spark Stream processing model?

This leads to a new stream processing model that is very similar to a batch processing model. You will express your streaming computation as standard batch-like query as on a static table, and Spark runs it as an incrementalquery on the unboundedinput table. Let’s understand this model in more detail. Basic Concepts

How to create streaming DataFrames/datasets in spark?

Similar to static Datasets/DataFrames, you can use the common entry point SparkSession ( Scala / Java / Python / R docs) to create streaming DataFrames/Datasets from streaming sources, and apply the same operations on them as static DataFrames/Datasets.

What type of data can be represented in spark?

API using Datasets and DataFrames Since Spark 2.0, DataFrames and Datasets can represent static, bounded data, as well as streaming, unbounded data.


Video Answer


1 Answers

Did you solve this?

You are doing a count() on a streaming dataframe before starting the execution by calling start(). If you want a count, how about doing this?

  sourceDF.writeStream
    .format("memory")
    .queryName("Output")
    .outputMode(OutputMode.Append())
    .start()
    .processAllAvailable()

  val results: List[Row] = spark.sql("select * from Output").collectAsList()
  assert(results.size() === 70) 
like image 135
Sumeeth Avatar answered Oct 15 '22 08:10

Sumeeth