I have a simple Spark Structured Streaming app that reads from Kafka and writes to HDFS. Today the app has mysteriously stopped working, with no changes or modifications whatsoever (it had been working flawlessly for weeks).
So far, I have observed the following:
Despite all of that, nothing is being written to HDFS anymore. Code snippet:
val inputData = spark
.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_servers)
.option("subscribe", topic-name-here")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false").load()
inputData.toDF()
.repartition(10)
.writeStream.format("parquet")
.option("checkpointLocation", "hdfs://...")
.option("path", "hdfs://...")
.outputMode(OutputMode.Append())
.trigger(Trigger.ProcessingTime("60 seconds"))
.start()
Any ideas why the UI shows no jobs/tasks?
Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.
Structured Streaming lets you express computation on streaming data in the same way you express a batch computation on static data. The Structured Streaming engine performs the computation incrementally and continuously updates the result as streaming data arrives.
Now that the Direct API of Spark Streaming (we currently have version 2.3. 2) is deprecated and we recently added the Confluent platform (comes with Kafka 2.2. 0) to our project we plan to migrate these applications.
Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches. Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data.
For anyone facing the same issue: I found the culprit:
Somehow the data within _spark_metadata in the HDFS directory where I was saving the data got corrupted.
The solution was to erase that directory and restart the application, which re-created the directory. After data, data started flowing.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With