I'm using Spark structured streaming
to process records read from Kafka
. Here's what I'm trying to achieve:
(a) Each record is a Tuple2
of type (Timestamp, DeviceId)
.
(b) I've created a static Dataset[DeviceId]
which contains the set of all valid device IDs (of type DeviceId
) that are expected to be seen in the Kafka
stream.
(c) I need to write a Spark structured streaming
query that
(i) Groups records by their timestamp into 5-minute windows
(ii) For each window, get the list of valid device IDs that were **not** seen in that window
For example, let's say the list of all valid device IDs is [A,B,C,D,E]
and the kafka records in a certain 5-minute window contain the device IDs [A,B,E]
. Then, for that window, the list of unseen device IDs I'm looking for is [C,D]
.
Question
except()
and join()
methods that Dataset
exposes. However, they both threw a runtime exception complaining that neither of those operations is supported on a streaming Dataset
.Here's a snippet of my code:
val validDeviceIds: Dataset[(DeviceId, Long)] = spark.createDataset[DeviceId](listOfAllDeviceIds.map(id => (id, 0L)))
case class KafkaRecord(timestamp: TimestampType, deviceId: DeviceId)
// kafkaRecs is the data stream from Kafka - type is Dataset[KafkaRecord]
val deviceIdsSeen = kafkaRecs
.withWatermark("timestamp", "5 minutes")
.groupBy(window($"timestamp", "5 minutes", "5 minutes"), $"deviceId")
.count()
.map(row => (row.getLong(0), 1L))
.as[(Long, Long)]
val unseenIds = deviceIdsSeen.join(validDeviceIds, Seq("_1"), "right_outer")
.filter(row => row.isNullAt(1))
.map(row => row.getLong(0))
The last statement throws the following exception:
Caused by: org.apache.spark.sql.AnalysisException: Right outer join with a streaming DataFrame/Dataset on the left is not supported;;
Thanks in advance.
Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.
Structured Streaming API. Structured Streaming is integrated into Spark's Dataset and DataFrame APIs; in most cases, you only need to add a few method calls to run a streaming computation. It also adds new operators for windowed aggregation and for setting parameters of the execution model (e.g. output modes).
Spark Streaming uses readStream to monitors the folder and process files that arrive in the directory real-time and uses writeStream to write DataFrame or Dataset. Spark Streaming is a scalable, high-throughput, fault-tolerant streaming processing system that supports both batch and streaming workloads.
Video streaming service aggregators are search aggregation services that scan multiple video streaming sites to list movies and TV shows available on various platforms. These sites do not host the content but instead provide links to the service provider sites, directly to the movie or show page in many instances.
The situation with join operations
in spark structured streaming looks as follows: streaming DataFrames
can be joined with static DataFrames
so in further create new streaming DataFrames
. But outer joins
between a streaming
and a static Datasets
are conditionally supported, while right/left joins
with a streaming Dataset
are not supported in general by structured streaming. As result, you faced with AnalysisException, which was thrown while you tried to make join static dataset with streaming dataset. As proof of my words, you can look at source code of spark, on this line exception is throwing which denotes that operation which you tried out is not supported.
I tried to make join operation on stream of DataFrames
with a static DataFrames
.
val streamingDf = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", "structured_topic")
.load()
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
val staticDf = Seq((1507831462 , 100)).toDF("Timestamp", "DeviceId")
//Inner Join
streamingDf.join(staticDf, "Timestamp")
line.join(staticDf, "Timestamp")
//Left Join
streamingDf.join(staticDf, "Timestamp", "left_join")
line.join(staticDf, "Timestamp", "left_join")
As you see, in addition to consuming data from Kafka
I read data from the socket launched via nc
(netcat), it significantly simplifies life while you make testing of stream app.
This approach works fine for me both with Kafka
and socket
as the source of data.
Hope that help.
Outer joins with streaming Dataset on opposite side are just not supported:
- Outer joins between a streaming and a static Datasets are conditionally supported.
- Full outer join with a streaming Dataset is not supported
- Left outer join with a streaming Dataset on the right is not supported
- Right outer join with a streaming Dataset on the left is not supported
If other Dataset
is small, you can use Map
or similar structure, broadcast
, and reference it inside UserDefinedFunction
.
val map: Broadcast[Map[T, U]] = ???
val lookup = udf((x: T) => map.value.get(x))
df.withColumn("foo", lookup($"_1"))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With