I have a Spark application (2.4.5) using Kafka as the source using big batch windows (5 minutes), in our application we only really care about the RDD from that specific interval to process data.
What is happening is that our application is crashing from time to time with either OutOfMemory exception on the Driver (running in client mode) or GC OutOfMemory on the executors. After a lot of research, it seemed that we were not handling the states properly which was causing the Lineage to grow indefinitely. We considered fixing the problem either by using a batch approach where we control the offsets grabbed from Kafka and create the RDD's from them (which would truncate the lineage) or by enabling checkpointing.
During the investigations someone found a not really similar issue which was solved by tweaking some UI parameters (Yarn Heap usage growing over time):
Since these are UI parameters, it doesn't make sense to me that they would affect the application's memory usage unless they affect the way applications store information to send to the UI. Early tests show that the application is indeed running longer without OOM issues.
Can anyone explain what is the impact these parameters have on Applications? Can they really impact memory usage on applications? Are there any other parameters that I should look into to get the whole picture (I'm wondering if there is a "factor" parameter that needs to be tweaked so memory allocation is appropriate for our case)?
Thank you
After a lot of testing our team managed to narrow down the problem to this particular paramter:
spark.sql.ui.retainedExecutions
I decided to dig in so I downloaded Spark's code. I found out that information about the Parsed Logical Plan is not only kept in the application's memory but it's also controlled by this parameter.
When a SparkSession session is created, one of the many objects that are instantiated is the SQLAppStatusListener. This class implements two methods:
onExecutionStart - On every execution , creates a new SparkPlanGraphWrapper, which will hold references to the Parsed Logical Plan, and add it to a SharedState object which in this case keeps track of how many instances of the object were created.
cleanupExecution - Removes the SparkPlanGraphWrapper from the SharedState object if the number of stored objects is greater than the value of spark.sql.ui.retainedExecutions, which defaults to 1000.
In our case specifically, the logical plan was taking 4MB of memory, so in a simplistic way, we would have to allocate 4GB of memory to accommodate the retained executions.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With