Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Outer join two Datasets (not DataFrames) in Spark Structured Streaming

I have some code that joins two streaming DataFrames and outputs to console.

val dataFrame1 =
  df1Input.withWatermark("timestamp", "40 seconds").as("A")

val dataFrame2 =
  df2Input.withWatermark("timestamp", "40 seconds").as("B")

val finalDF: DataFrame = dataFrame1.join(dataFrame2,
      expr(
        "A.id = B.id" +
          " AND " +
          "B.timestamp >= A.timestamp " +
          " AND " +
          "B.timestamp <= A.timestamp + interval 1 hour")
      , joinType = "leftOuter")
finalDF.writeStream.format("console").start().awaitTermination()

What I now want is to refactor this part to use Datasets, so I can have some compile-time checking.

So what I tried was pretty straightforward:

val finalDS: Dataset[(A,B)] = dataFrame1.as[A].joinWith(dataFrame2.as[B],
      expr(
        "A.id = B.id" +
          " AND " +
          "B.timestamp >= A.timestamp " +
          " AND " +
          "B.timestamp <= A.timestamp + interval 1 hour")
      , joinType = "leftOuter")
finalDS.writeStream.format("console").start().awaitTermination()

However, this gives the following error:

org.apache.spark.sql.AnalysisException: Stream-stream outer join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition;;

As you can see, the join code hasn't changed, so there is a watermark on both sides and a range condition. The only change was to use the Dataset API instead of DataFrame.

Also, it is fine when I use inner join:

val finalDS: Dataset[(A,B)] = dataFrame1.as[A].joinWith(dataFrame2.as[B],
          expr(
            "A.id = B.id" +
              " AND " +
              "B.timestamp >= A.timestamp " +
              " AND " +
              "B.timestamp <= A.timestamp + interval 1 hour")
          )
    finalDS.writeStream.format("console").start().awaitTermination()

Does anyone know how can this happen?

like image 585
Shikkou Avatar asked Jul 09 '18 07:07

Shikkou


People also ask

What is the difference between Spark streaming and structured streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.

What is new in structured streaming in Spark?

Apache Spark 2.0 adds the first version of a new higher-level API, Structured Streaming, for building continuous applications. The main goal is to make it easier to build end-to-end streaming applications, which integrate with storage, serving systems, and batch jobs in a consistent and fault-tolerant way.

What is readStream in Spark?

Spark Streaming uses readStream to monitors the folder and process files that arrive in the directory real-time and uses writeStream to write DataFrame or Dataset. Spark Streaming is a scalable, high-throughput, fault-tolerant streaming processing system that supports both batch and streaming workloads.

Which property must a Spark structured streaming sink possess to ensure end to end exactly once semantics?

exactly once semantics are only possible if the source is re-playable and the sink is idempotent.


1 Answers

Well, when you using joinWith method instead of join you rely on different implementation and it seems like this implementation not support leftOuter join for streaming Datasets.

You can check outer joins with watermarking section of the official documentation. Method join not joinWith used. Note that result type will be DataFrame. That means that you most likely will have to map field manually

val finalDS = dataFrame1.as[A].join(dataFrame2.as[B],
    expr(
      "A.key = B.key" +
        " AND " +
        "B.timestamp >= A.timestamp " +
        " AND " +
        "B.timestamp <= A.timestamp + interval 1 hour"),
    joinType = "leftOuter").select(/* useful fields */).as[C]
like image 143
addmeaning Avatar answered Sep 19 '22 19:09

addmeaning