I have the following minimal working example:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import numpy as np
sc = SparkContext()
sqlContext = SQLContext(sc)
# Create dummy pySpark DataFrame with 1e5 rows and 16 partitions
df = sqlContext.range(0, int(1e5), numPartitions=16)
def toy_example(rdd):
# Read in pySpark DataFrame partition
data = list(rdd)
# Generate random data using Numpy
rand_data = np.random.random(int(1e7))
# Apply the `int` function to each element of `rand_data`
for i in range(len(rand_data)):
e = rand_data[i]
int(e)
# Return a single `0` value
return [[0]]
# Execute the above function on each partition (16 partitions)
result = df.rdd.mapPartitions(toy_example)
result = result.collect()
When the above is run, the memory of the executor's Python process steadily increases after each iteration suggesting the memory of the previous iteration isn't being released - i.e., a memory leak. This can lead to a job failure if the memory exceeds the executor's memory limit - see below:
Bizarrely any of the following prevents the memory leak:
data = list(rdd)
rand_data = list(rand_data.tolist())
after rand_data = np.random.random(int(1e7))
int(e)
The above code is a minimal working example of a much larger project which cannot use the above fixes.
Some things to take notice of:
rdd
data is not used in the function, the line is required to reproduce the leak. In the real world project, the rdd
data is used.rand_data
not being releasedint
operation on each element of rand_data
to reproduce the leak 🤷Can you force the PySpark executor to release the memory of rand_data
by inserting code in the first few lines or last few lines of the toy_example
function?
Force garbage collection by inserting at the end of the function:
del data, rand_data
import gc
gc.collect()
Force memory release by inserting at the end or beginning of the function (inspired by a Pandas issue):
from ctypes import cdll, CDLL
cdll.LoadLibrary("libc.so.6")
libc = CDLL("libc.so.6")
libc.malloc_trim(0)
The following PySpark job was run on a AWS EMR cluster with one m4.xlarge worker node. Numpy had to be pip installed on the worker node via bootstrapping.
The memory of the executor was measured using the following function (printed to the executor's log):
import resource
resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
Spark submit config:
Versions:
We recently ran into a very similar issue and we also could not force a memory release by changing code. What worked for us, however, was using the following Spark option:
spark.python.worker.reuse = False
I had a similar problem in a project where several parameters to be inserted in the database were being saved in a list. That list was created inside a loop, but we saw that even when the loop ended, part of the memory of the list was not released. In fact, it is a recurring problem (with different types of data) that has been discussed in several places (source 1, source 2, source 3, source 4...). The solution was then to create a process and perform there the creation of the list since at the end of the process that memory is released directly by the operating system without Python being able to do something (bad) about it.
Another solution is the one commented by @paul, setting the spark.python.worker.reuse
option to false
does something similar, but at a more internal level in Spark.
Below I did a quick benchmark with both approaches, apparently, the first solution is faster. It would be necessary to test it in a real environment with large data transfer. At least we have one more approach to try and fix the problem.
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
import numpy as np
import time
import resource
from multiprocessing import Process, Queue
import timeit
def process_data(q: Queue, rdd):
# Read in pySpark DataFrame partition
data = list(rdd)
# Generate random data using Numpy
rand_data = np.random.random(int(1e7))
# Apply the `int` function to each element of `rand_data`
for i in range(len(rand_data)):
e = rand_data[i]
int(e)
# Return a single `0` value
q.put([[0]])
def toy_example_with_process(rdd):
# `used_memory` size should not be increased on every call to toy_example as
# the previous call memory should be released
used_memory = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
q = Queue()
p = Process(target=process_data, args=(q, rdd))
p.start()
_process_result = q.get()
p.join()
return [[used_memory]]
def toy_example(rdd):
# `used_memory` size should not be increased on every call to toy_example as
# the previous call memory should be released
used_memory = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# Read in pySpark DataFrame partition
data = list(rdd)
# Generate random data using Numpy
rand_data = np.random.random(int(1e7))
# Apply the `int` function to each element of `rand_data`
for i in range(len(rand_data)):
e = rand_data[i]
int(e)
return [[used_memory]]
def worker_reuse_false(df):
"""Allocations are in the mapPartitions function but the `spark.python.worker.reuse` is set to 'false'
and prevents memory leaks"""
memory_usage = df.rdd.mapPartitions(toy_example).collect()
print(memory_usage) # Just for debugging, remove
def with_process(df):
"""Allocations are inside a new Process. Memory is released by the OS"""
memory_usage = df.rdd.mapPartitions(toy_example_with_process).collect()
print(memory_usage) # Just for debugging, remove
iterations = 10
# Timeit with `spark.python.worker.reuse` = 'false'
conf = SparkConf().setMaster("spark://master-node:7077").setAppName(f"Memory leak reuse false {time.time()}")
conf = conf.set("spark.python.worker.reuse", 'false')
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
df = sqlContext.range(0, int(1e5), numPartitions=16)
worker_reuse_time = timeit.timeit(lambda: worker_reuse_false(df), number=iterations)
print(f'Worker reuse: {round(worker_reuse_time, 3)} seconds')
# Timeit with external Process
sc.stop() # Needed to set a new SparkContext config
conf = SparkConf().setMaster("spark://master-node:7077").setAppName(f"Memory leak with process {time.time()}")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
df = sqlContext.range(0, int(1e5), numPartitions=16)
with_process_time = timeit.timeit(lambda: with_process(df), number=iterations)
print(f'With process: {round(with_process_time, 3)} seconds')
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With