Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

IllegalStateException: _spark_metadata/0 doesn't exist while compacting batch 9

We have Streaming Application implemented using Spark Structured Streaming which tries to read data from Kafka topics and write it to HDFS Location.

Sometimes application fails with Exception:

_spark_metadata/0 doesn't exist while compacting batch 9
java.lang.IllegalStateException: history/1523305060336/_spark_metadata/9.compact doesn't exist when compacting batch 19 (compactInterval: 10)

We are not able to resolve this issue.

Only solution I found is to delete checkpoint location files which will make the job read the topic/data from beginning as soon as we run the application again. However, this is not a feasible solution for production application.

Does anyone has a solution for this error without deleting checkpoint such that I can continue from where the last run was failed?

Sample code of application:

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", <server list>)
  .option("subscribe", <topic>)
  .load()

[...] // do some processing

dfProcessed.writeStream
  .format("csv")
  .option("format", "append")
  .option("path",hdfsPath)
  .option("checkpointlocation","")
  .outputmode(append)
  .start
like image 488
Azim Kangda Avatar asked May 31 '19 07:05

Azim Kangda


2 Answers

The error message

_spark_metadata/n.compact doesn't exist when compacting batch n+10

can show up when you

  • process some data into a FileSink with checkpoint enabled, then
  • stop your streaming job, then
  • change the output directory of the FileSink while keeping the same checkpointLocation, then
  • restart the streaming job

Solution

As you do not want to delete your checkpoint files, you could simply copy the missing spark metadata files from the old File Sink output path to the new output Path. See below to understand what are the "missing spark metadata files".

Background

To understand, why this IllegalStateException is being thrown, we need to understand what is happening behind the scene in the provided file output path. Let outPathBefore be the name of this path. When your streaming job is running and processing data the job creates a folder outPathBefore/_spark_metadata. In that folder you will find a file named after micro-batch Identifier containing the list of files (partitioned files) the data has been written to, e.g:

/home/mike/outPathBefore/_spark_metadata$ ls
0 1 2 3 4 5 6 7

In this case we have details for 8 micro batches. The content of one of the files looks like

/home/mike/outPathBefore/_spark_metadata$ cat 0
v1
{"path":"file:///tmp/file/before/part-00000-99bdc705-70a2-410f-92ff-7ca9c369c58b-c000.csv","size":2287,"isDir":false,"modificationTime":1616075186000,"blockReplication":1,"blockSize":33554432,"action":"add"}

By default, on each tenth micro batch, these files are getting compacted, meaning the contents of the files 0, 1, 2, ..., 9 will be stored in a compacted file called 9.compact.

This procedure continuous for the subsequent ten batches, i.e. in the micro batch 19 the job aggregates the last 10 files which are 9.compact, 10, 11, 12, ..., 19.

Now, imagine you had the streaming job running until micro batch 15 which means the job has created the following files:

/home/mike/outPathBefore/_spark_metadata/0
/home/mike/outPathBefore/_spark_metadata/1
...
/home/mike/outPathBefore/_spark_metadata/8
/home/mike/outPathBefore/_spark_metadata/9.compact
/home/mike/outPathBefore/_spark_metadata/10
...
/home/mike/outPathBefore/_spark_metadata/15

After the fifteenth micro batch you stopped the streaming job and changed the output path of the File Sink to, say, outPathAfter. As you keep the same checkpointLocation the streaming job will continue with micro-batch 16. However, it now creates the metadata files in the new out path:

/home/mike/outPathAfter/_spark_metadata/16
/home/mike/outPathAfter/_spark_metadata/17
...

Now, and this is where the Exception is thrown: When reaching micro batch 19, the job tries to compact the tenth latest files from spark metadata folder. However, it can only find the files 16, 17, 18 but it does not find 9.compact, 10 etc. Hence the error message says:

java.lang.IllegalStateException: history/1523305060336/_spark_metadata/9.compact doesn't exist when compacting batch 19 (compactInterval: 10)

Documentation

The Structured Streaming Programming Guide explains on Recovery Semantics after Changes in a Streaming Query:

"Changes to output directory of a file sink are not allowed: sdf.writeStream.format("parquet").option("path", "/somePath") to sdf.writeStream.format("parquet").option("path", "/anotherPath")"

Databricks has also written some details in the article Streaming with File Sink: Problems with recovery if you change checkpoint or output directories

like image 183
Michael Heil Avatar answered Sep 28 '22 02:09

Michael Heil


Error caused by checkpointLocation because checkpointLocation stores old or deleted data information. You just need to delete the folder containing checkpointLocation.

Explore more :https://kb.databricks.com/streaming/file-sink-streaming.html

Example :

df.writeStream
      .format("parquet")
      .outputMode("append")
      .option("checkpointLocation", "D:/path/dir/checkpointLocation")
      .option("path", "D:/path/dir/output")
      .trigger(Trigger.ProcessingTime("5 seconds"))
      .start()
      .awaitTermination()

You need to do delete directory checkpointLocation.

like image 42
Manh Quang Do Avatar answered Sep 28 '22 01:09

Manh Quang Do