Our Spark environment: DataBricks 4.2 (includes Apache Spark 2.3.1, Scala 2.11)
What we try to achieve: We want to enrich streaming data with some reference data, which is updated regularly. The enrichment is done by joining the stream with the reference data.
What we implemented:
We implemented two spark jobs (jars):
The first one is updating a Spark table TEST_TABLE
every hour (let’s call it ‘reference data’) by using
<dataset>.write.mode(SaveMode.Overwrite).saveAsTable("TEST_TABLE")
and afterwards calling spark.catalog.refreshTable("TEST_TABLE")
.
The second job (let’s call it streaming data) is using Spark Structured Streaming to stream reading some data, joining it using DataFrame.transform()
with table TEST_TABLE
and writing it to another system.
We are reading the reference data using spark.read.table(“TEST_TABLE”)
in the function called by .transform()
so we get the latest values in the table. Unfortunately, the second app crashes every time the first app updates the table. The following message is shown in Log4j output:
18/08/23 10:34:40 WARN TaskSetManager: Lost task 0.0 in stage 547.0 (TID 5599, 10.139.64.9, executor 0): java.io.FileNotFoundException: dbfs:/user/hive/warehouse/code.db/TEST_TABLE/ part-00000-tid-5184425276562097398-25a0e542-41e4-416f-bae8-469899a72c21-36-c000.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readFile(FileScanRDD.scala:203)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$createNextIterator(FileScanRDD.scala:377)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:295)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:291)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748
We also tried to invalidate the cache before we read the table but that decreased the performance and the app crashed nevertheless. We suspect the root course is the lazy evaluation of the reference dataset (which still ‘points’ to the old data, which is not present anymore).
Do you have any suggestions what we could do to prevent this problem or what the best approach to join a stream with dynamic reference data is?
Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.
Spark Streaming uses readStream to monitors the folder and process files that arrive in the directory real-time and uses writeStream to write DataFrame or Dataset. Spark Streaming is a scalable, high-throughput, fault-tolerant streaming processing system that supports both batch and streaming workloads.
Spark uses SortMerge joins to join large table. It consists of hashing each row on both table and shuffle the rows with the same hash into the same partition. There the keys are sorted on both side and the sortMerge algorithm is applied. That's the best approach as far as I know.
Join to the reference data; do not cache it, this ensures you go to source. Look for latest version data which is signified by a primary key + a counter, where this counter closest to or equal to a counter you maintain in Streaming application. Every hour write, append all the ref data still current, again but with incremented counter; i.e. a new version. Use parquet here.
Instead of joining the table and stream. You can take advantage of a new feature available in spark 2.3.1 i.e joining of two streams data. Create a stream instead of a table with the watermark.
Watermarks: Watermarking in Structured Streaming is a way to limit state in all
stateful streaming operations by specifying how much late data to consider.
Specifically, a watermark is a moving threshold in event-time that trails behind the
maximum event-time seen by the query in the processed data. The trailing gap (aka
watermark delay) defines how long should the engine wait for late data to arrive and
is specified in the query using withWatermark.
Refer databricks blog
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With