Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Structured Streaming OOM

I deploy a structured streaming job with on the k8s operator, which simply reads from kafka, deserializes, adds 2 columns and stores the results in the datalake (tried both delta and parquet) and after days the executor increases memory and eventually i get OOM. The input records are in terms of kbs really low. P.s i use the exactly same code, but with cassandra as a sink which runs for almost a month now, without any issues. any ideas plz?

enter image description here

enter image description here

My code

spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", MetisStreamsConfig.bootstrapServers)
    .option("subscribe", MetisStreamsConfig.topics.head)
    .option("startingOffsets", startingOffsets)
    .option("maxOffsetsPerTrigger", MetisStreamsConfig.maxOffsetsPerTrigger)
    .load()
    .selectExpr("CAST(value AS STRING)")
    .as[String]
    .withColumn("payload", from_json($"value", schema))

    // selection + filtering
    .select("payload.*")
    .select($"vesselQuantity.qid" as "qid", $"vesselQuantity.vesselId" as "vessel_id", explode($"measurements"))
    .select($"qid", $"vessel_id", $"col.*")
    .filter($"timestamp".isNotNull)
    .filter($"qid".isNotNull and !($"qid"===""))
    .withColumn("ingestion_time", current_timestamp())
    .withColumn("mapping", MappingUDF($"qid"))
  writeStream
    .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
      log.info(s"Storing batch with id: `$batchId`")
      val calendarInstance = Calendar.getInstance()

      val year = calendarInstance.get(Calendar.YEAR)
      val month = calendarInstance.get(Calendar.MONTH) + 1
      val day = calendarInstance.get(Calendar.DAY_OF_MONTH)
      batchDF.write
        .mode("append")
        .parquet(streamOutputDir + s"/$year/$month/$day")
    }
    .option("checkpointLocation", checkpointDir)
    .start()

i changed to foreachBatch because using delta or parquet with partitionBy cause issues faster

like image 870
Giannis Polyzos Avatar asked Jun 13 '26 19:06

Giannis Polyzos


1 Answers

There is a bug that is resolved in Spark 3.1.0.

See https://github.com/apache/spark/pull/28904

Other ways of overcoming the issue & a credit for debugging:

https://www.waitingforcode.com/apache-spark-structured-streaming/file-sink-out-of-memory-risk/read

You may find this helpful even though you are using foreachBatch ...

like image 181
Boris Avatar answered Jun 16 '26 19:06

Boris