Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark structured streaming - join static dataset with streaming dataset

I'm using Spark structured streaming to process records read from Kafka. Here's what I'm trying to achieve:

(a) Each record is a Tuple2 of type (Timestamp, DeviceId).

(b) I've created a static Dataset[DeviceId] which contains the set of all valid device IDs (of type DeviceId) that are expected to be seen in the Kafka stream.

(c) I need to write a Spark structured streaming query that

 (i) Groups records by their timestamp into 5-minute windows
 (ii) For each window, get the list of valid device IDs that were **not** seen in that window

For example, let's say the list of all valid device IDs is [A,B,C,D,E] and the kafka records in a certain 5-minute window contain the device IDs [A,B,E]. Then, for that window, the list of unseen device IDs I'm looking for is [C,D].

Question

  1. How can this query be written in Spark structured streaming? I tried using the except() and join() methods that Dataset exposes. However, they both threw a runtime exception complaining that neither of those operations is supported on a streaming Dataset.

Here's a snippet of my code:

val validDeviceIds: Dataset[(DeviceId, Long)] = spark.createDataset[DeviceId](listOfAllDeviceIds.map(id => (id, 0L))) 

case class KafkaRecord(timestamp: TimestampType, deviceId: DeviceId)

// kafkaRecs is the data stream from Kafka - type is Dataset[KafkaRecord]
val deviceIdsSeen = kafkaRecs
     .withWatermark("timestamp", "5 minutes")
     .groupBy(window($"timestamp", "5 minutes", "5 minutes"), $"deviceId")
     .count()
     .map(row => (row.getLong(0), 1L))
     .as[(Long, Long)]

val unseenIds = deviceIdsSeen.join(validDeviceIds, Seq("_1"), "right_outer")
     .filter(row => row.isNullAt(1))
     .map(row => row.getLong(0))

The last statement throws the following exception:

Caused by: org.apache.spark.sql.AnalysisException: Right outer join with a streaming DataFrame/Dataset on the left is not supported;;

Thanks in advance.

like image 751
jithinpt Avatar asked Oct 02 '17 22:10

jithinpt


People also ask

What is the difference between Spark streaming and structured streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.

How is streaming implemented in Spark explain processing model of structured streaming?

Structured Streaming API. Structured Streaming is integrated into Spark's Dataset and DataFrame APIs; in most cases, you only need to add a few method calls to run a streaming computation. It also adds new operators for windowed aggregation and for setting parameters of the execution model (e.g. output modes).

What is readStream in Spark?

Spark Streaming uses readStream to monitors the folder and process files that arrive in the directory real-time and uses writeStream to write DataFrame or Dataset. Spark Streaming is a scalable, high-throughput, fault-tolerant streaming processing system that supports both batch and streaming workloads.

What is streaming aggregation?

Video streaming service aggregators are search aggregation services that scan multiple video streaming sites to list movies and TV shows available on various platforms. These sites do not host the content but instead provide links to the service provider sites, directly to the movie or show page in many instances.


2 Answers

The situation with join operations in spark structured streaming looks as follows: streaming DataFrames can be joined with static DataFrames so in further create new streaming DataFrames. But outer joins between a streaming and a static Datasets are conditionally supported, while right/left joins with a streaming Dataset are not supported in general by structured streaming. As result, you faced with AnalysisException, which was thrown while you tried to make join static dataset with streaming dataset. As proof of my words, you can look at source code of spark, on this line exception is throwing which denotes that operation which you tried out is not supported.

I tried to make join operation on stream of DataFrames with a static DataFrames.

val streamingDf = sparkSession
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "127.0.0.1:9092")
    .option("subscribe", "structured_topic")
    .load()

val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

val staticDf = Seq((1507831462 , 100)).toDF("Timestamp", "DeviceId")

//Inner Join
streamingDf.join(staticDf, "Timestamp")
line.join(staticDf, "Timestamp")

//Left Join
streamingDf.join(staticDf, "Timestamp", "left_join")
line.join(staticDf, "Timestamp", "left_join")

As you see, in addition to consuming data from Kafka I read data from the socket launched via nc (netcat), it significantly simplifies life while you make testing of stream app. This approach works fine for me both with Kafka and socket as the source of data.

Hope that help.

like image 100
Artem Avatar answered Sep 18 '22 13:09

Artem


Outer joins with streaming Dataset on opposite side are just not supported:

  • Outer joins between a streaming and a static Datasets are conditionally supported.
    • Full outer join with a streaming Dataset is not supported
    • Left outer join with a streaming Dataset on the right is not supported
    • Right outer join with a streaming Dataset on the left is not supported

If other Dataset is small, you can use Map or similar structure, broadcast, and reference it inside UserDefinedFunction.

val map: Broadcast[Map[T, U]] = ???
val lookup = udf((x: T) => map.value.get(x))

df.withColumn("foo", lookup($"_1"))
like image 27
user8762155 Avatar answered Sep 18 '22 13:09

user8762155