We have Streaming Application implemented using Spark Structured Streaming which tries to read data from Kafka topics and write it to HDFS Location.
Sometimes application fails with Exception:
_spark_metadata/0 doesn't exist while compacting batch 9
java.lang.IllegalStateException: history/1523305060336/_spark_metadata/9.compact doesn't exist when compacting batch 19 (compactInterval: 10)
We are not able to resolve this issue.
Only solution I found is to delete checkpoint location files which will make the job read the topic/data from beginning as soon as we run the application again. However, this is not a feasible solution for production application.
Does anyone has a solution for this error without deleting checkpoint such that I can continue from where the last run was failed?
Sample code of application:
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", <server list>)
.option("subscribe", <topic>)
.load()
[...] // do some processing
dfProcessed.writeStream
.format("csv")
.option("format", "append")
.option("path",hdfsPath)
.option("checkpointlocation","")
.outputmode(append)
.start
The error message
_spark_metadata/n.compact doesn't exist when compacting batch n+10
can show up when you
As you do not want to delete your checkpoint files, you could simply copy the missing spark metadata files from the old File Sink output path to the new output Path. See below to understand what are the "missing spark metadata files".
To understand, why this IllegalStateException
is being thrown, we need to understand what is happening behind the scene in the provided file output path. Let outPathBefore
be the name of this path. When your streaming job is running and processing data the job creates a folder outPathBefore/_spark_metadata
. In that folder you will find a file named after micro-batch Identifier containing the list of files (partitioned files) the data has been written to, e.g:
/home/mike/outPathBefore/_spark_metadata$ ls
0 1 2 3 4 5 6 7
In this case we have details for 8 micro batches. The content of one of the files looks like
/home/mike/outPathBefore/_spark_metadata$ cat 0
v1
{"path":"file:///tmp/file/before/part-00000-99bdc705-70a2-410f-92ff-7ca9c369c58b-c000.csv","size":2287,"isDir":false,"modificationTime":1616075186000,"blockReplication":1,"blockSize":33554432,"action":"add"}
By default, on each tenth micro batch, these files are getting compacted, meaning the contents of the files 0, 1, 2, ..., 9 will be stored in a compacted file called 9.compact
.
This procedure continuous for the subsequent ten batches, i.e. in the micro batch 19 the job aggregates the last 10 files which are 9.compact, 10, 11, 12, ..., 19.
Now, imagine you had the streaming job running until micro batch 15 which means the job has created the following files:
/home/mike/outPathBefore/_spark_metadata/0
/home/mike/outPathBefore/_spark_metadata/1
...
/home/mike/outPathBefore/_spark_metadata/8
/home/mike/outPathBefore/_spark_metadata/9.compact
/home/mike/outPathBefore/_spark_metadata/10
...
/home/mike/outPathBefore/_spark_metadata/15
After the fifteenth micro batch you stopped the streaming job and changed the output path of the File Sink to, say, outPathAfter
. As you keep the same checkpointLocation the streaming job will continue with micro-batch 16. However, it now creates the metadata files in the new out path:
/home/mike/outPathAfter/_spark_metadata/16
/home/mike/outPathAfter/_spark_metadata/17
...
Now, and this is where the Exception is thrown: When reaching micro batch 19, the job tries to compact the tenth latest files from spark metadata folder. However, it can only find the files 16, 17, 18 but it does not find 9.compact, 10 etc. Hence the error message says:
java.lang.IllegalStateException: history/1523305060336/_spark_metadata/9.compact doesn't exist when compacting batch 19 (compactInterval: 10)
The Structured Streaming Programming Guide explains on Recovery Semantics after Changes in a Streaming Query:
"Changes to output directory of a file sink are not allowed: sdf.writeStream.format("parquet").option("path", "/somePath") to sdf.writeStream.format("parquet").option("path", "/anotherPath")"
Databricks has also written some details in the article Streaming with File Sink: Problems with recovery if you change checkpoint or output directories
Error caused by checkpointLocation
because checkpointLocation
stores old or deleted data information. You just need to delete the folder containing checkpointLocation
.
Explore more :https://kb.databricks.com/streaming/file-sink-streaming.html
Example :
df.writeStream
.format("parquet")
.outputMode("append")
.option("checkpointLocation", "D:/path/dir/checkpointLocation")
.option("path", "D:/path/dir/output")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start()
.awaitTermination()
You need to do delete directory checkpointLocation
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With