Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a good way to join a stream in spark with a changing table?

Our Spark environment: DataBricks 4.2 (includes Apache Spark 2.3.1, Scala 2.11)

What we try to achieve: We want to enrich streaming data with some reference data, which is updated regularly. The enrichment is done by joining the stream with the reference data.

What we implemented: We implemented two spark jobs (jars): The first one is updating a Spark table TEST_TABLE every hour (let’s call it ‘reference data’) by using

<dataset>.write.mode(SaveMode.Overwrite).saveAsTable("TEST_TABLE")

and afterwards calling spark.catalog.refreshTable("TEST_TABLE").

The second job (let’s call it streaming data) is using Spark Structured Streaming to stream reading some data, joining it using DataFrame.transform() with table TEST_TABLE and writing it to another system. We are reading the reference data using spark.read.table(“TEST_TABLE”) in the function called by .transform() so we get the latest values in the table. Unfortunately, the second app crashes every time the first app updates the table. The following message is shown in Log4j output:

18/08/23 10:34:40 WARN TaskSetManager: Lost task 0.0 in stage 547.0 (TID 5599, 10.139.64.9, executor 0): java.io.FileNotFoundException: dbfs:/user/hive/warehouse/code.db/TEST_TABLE/ part-00000-tid-5184425276562097398-25a0e542-41e4-416f-bae8-469899a72c21-36-c000.snappy.parquet

It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readFile(FileScanRDD.scala:203)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$createNextIterator(FileScanRDD.scala:377)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:295)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:291)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748

We also tried to invalidate the cache before we read the table but that decreased the performance and the app crashed nevertheless. We suspect the root course is the lazy evaluation of the reference dataset (which still ‘points’ to the old data, which is not present anymore).

Do you have any suggestions what we could do to prevent this problem or what the best approach to join a stream with dynamic reference data is?

like image 675
Benedikt Beckermann Avatar asked Aug 23 '18 15:08

Benedikt Beckermann


People also ask

What is the difference between Spark Streaming and structured Streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.

What is readStream in Spark?

Spark Streaming uses readStream to monitors the folder and process files that arrive in the directory real-time and uses writeStream to write DataFrame or Dataset. Spark Streaming is a scalable, high-throughput, fault-tolerant streaming processing system that supports both batch and streaming workloads.

How do you connect a large table in Spark?

Spark uses SortMerge joins to join large table. It consists of hashing each row on both table and shuffle the rows with the same hash into the same partition. There the keys are sorted on both side and the sortMerge algorithm is applied. That's the best approach as far as I know.


2 Answers

Join to the reference data; do not cache it, this ensures you go to source. Look for latest version data which is signified by a primary key + a counter, where this counter closest to or equal to a counter you maintain in Streaming application. Every hour write, append all the ref data still current, again but with incremented counter; i.e. a new version. Use parquet here.

like image 172
thebluephantom Avatar answered Oct 12 '22 15:10

thebluephantom


Instead of joining the table and stream. You can take advantage of a new feature available in spark 2.3.1 i.e joining of two streams data. Create a stream instead of a table with the watermark.

Watermarks: Watermarking in Structured Streaming is a way to limit state in all 
stateful streaming operations by specifying how much late data to consider. 
Specifically, a watermark is a moving threshold in event-time that trails behind the 
maximum event-time seen by the query in the processed data. The trailing gap (aka 
watermark delay) defines how long should the engine wait for late data to arrive and 
is specified in the query using withWatermark.

Refer databricks blog

like image 31
devesh Avatar answered Oct 12 '22 14:10

devesh