Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Structured Streaming - AssertionError in Checkpoint due to increasing the number of input sources

I am trying to join two streams into one and write the result to a topic

code: 1- Reading two topics

val PERSONINFORMATION_df: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "xx:9092")
    .option("subscribe", "PERSONINFORMATION")
    .option("group.id", "info")
    .option("maxOffsetsPerTrigger", 1000)
    .option("startingOffsets", "earliest")
    .load()


val CANDIDATEINFORMATION_df: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "xxx:9092")
    .option("subscribe", "CANDIDATEINFORMATION")
    .option("group.id", "candent")
    .option("startingOffsets", "earliest")
    .option("maxOffsetsPerTrigger", 1000)
    .option("failOnDataLoss", "false")
    .load()

2- Parse data to join them:

val parsed_PERSONINFORMATION_df: DataFrame = PERSONINFORMATION_df
      .select(from_json(expr("cast(value as string) as actualValue"), schemaPERSONINFORMATION).as("s")).select("s.*")

   val parsed_CANDIDATEINFORMATION_df: DataFrame = CANDIDATEINFORMATION_df
      .select(from_json(expr("cast(value as string) as actualValue"), schemaCANDIDATEINFORMATION).as("s")).select("s.*")

   val df_person = parsed_PERSONINFORMATION_df.as("dfperson")
   val df_candidate = parsed_CANDIDATEINFORMATION_df.as("dfcandidate")

3- Join two frames

  val joined_df : DataFrame = df_candidate.join(df_person, col("dfcandidate.PERSONID") === col("dfperson.ID"),"inner")

  val string2json: DataFrame = joined_df.select($"dfcandidate.ID".as("key"),to_json(struct($"dfcandidate.ID", $"FULLNAME", $"PERSONALID")).cast("String").as("value"))

4- Write them to a topic

  string2json.writeStream.format("kafka")
      .option("kafka.bootstrap.servers", xxxx:9092")
      .option("topic", "toDelete")
      .option("checkpointLocation", "checkpoints")
      .option("failOnDataLoss", "false")
      .start()
      .awaitTermination()

Error message:

    21/01/25 11:01:41 ERROR streaming.MicroBatchExecution: Query [id = 9ce8bcf2-0299-42d5-9b5e-534af8d689e3, runId = 0c0919c6-f49e-48ae-a635-2e95e31fdd50] terminated with error
java.lang.AssertionError: assertion failed: There are [1] sources in the checkpoint offsets and now there are [2] sources requested by the query. Cannot continue.
       
like image 321
Atheer Abdullatif Avatar asked Oct 12 '25 00:10

Atheer Abdullatif


1 Answers

Your code looks fine to me, it is rather the checkpointing that is causing the issue.

Based on the error message you are getting you probably ran this job with only one stream source. Then, you added the code for the stream join and tried to re-start the application without remiving existing checkpoint files. Now, the application tries to recover from the checkpoint files but realises that you initially had only one source and now you have two sources.

The section Recovery Semantics after Changes in a Streaming Query explains which changes are allowed and not allowed when using checkpointing. Changing the number of input sources is not allowed:

"Changes in the number or type (i.e. different source) of input sources: This is not allowed."

To solve your problem: Delete the current checkpoint files and re-start the job.

like image 145
Michael Heil Avatar answered Oct 15 '25 01:10

Michael Heil



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!