I deploy a structured streaming job with on the k8s operator, which simply reads from kafka, deserializes, adds 2 columns and stores the results in the datalake (tried both delta and parquet) and after days the executor increases memory and eventually i get OOM. The input records are in terms of kbs really low. P.s i use the exactly same code, but with cassandra as a sink which runs for almost a month now, without any issues. any ideas plz?
enter image description here
enter image description here
My code
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", MetisStreamsConfig.bootstrapServers)
.option("subscribe", MetisStreamsConfig.topics.head)
.option("startingOffsets", startingOffsets)
.option("maxOffsetsPerTrigger", MetisStreamsConfig.maxOffsetsPerTrigger)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
.withColumn("payload", from_json($"value", schema))
// selection + filtering
.select("payload.*")
.select($"vesselQuantity.qid" as "qid", $"vesselQuantity.vesselId" as "vessel_id", explode($"measurements"))
.select($"qid", $"vessel_id", $"col.*")
.filter($"timestamp".isNotNull)
.filter($"qid".isNotNull and !($"qid"===""))
.withColumn("ingestion_time", current_timestamp())
.withColumn("mapping", MappingUDF($"qid"))
writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
log.info(s"Storing batch with id: `$batchId`")
val calendarInstance = Calendar.getInstance()
val year = calendarInstance.get(Calendar.YEAR)
val month = calendarInstance.get(Calendar.MONTH) + 1
val day = calendarInstance.get(Calendar.DAY_OF_MONTH)
batchDF.write
.mode("append")
.parquet(streamOutputDir + s"/$year/$month/$day")
}
.option("checkpointLocation", checkpointDir)
.start()
i changed to foreachBatch because using delta or parquet with partitionBy cause issues faster
There is a bug that is resolved in Spark 3.1.0.
See https://github.com/apache/spark/pull/28904
Other ways of overcoming the issue & a credit for debugging:
https://www.waitingforcode.com/apache-spark-structured-streaming/file-sink-out-of-memory-risk/read
You may find this helpful even though you are using foreachBatch ...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With