Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark: Numpy memory not being released in executor map-partition function (memory leak)

I have the following minimal working example:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import numpy as np

sc = SparkContext()
sqlContext = SQLContext(sc)

# Create dummy pySpark DataFrame with 1e5 rows and 16 partitions
df = sqlContext.range(0, int(1e5), numPartitions=16)

def toy_example(rdd):

    # Read in pySpark DataFrame partition
    data = list(rdd)

    # Generate random data using Numpy
    rand_data = np.random.random(int(1e7))

    # Apply the `int` function to each element of `rand_data`
    for i in range(len(rand_data)):
        e = rand_data[i]
        int(e)

    # Return a single `0` value
    return [[0]]

# Execute the above function on each partition (16 partitions)
result = df.rdd.mapPartitions(toy_example)
result = result.collect()

When the above is run, the memory of the executor's Python process steadily increases after each iteration suggesting the memory of the previous iteration isn't being released - i.e., a memory leak. This can lead to a job failure if the memory exceeds the executor's memory limit - see below:

enter image description here

Bizarrely any of the following prevents the memory leak:

  • Remove the line data = list(rdd)
  • Insert the line rand_data = list(rand_data.tolist()) after rand_data = np.random.random(int(1e7))
  • Remove the line int(e)

The above code is a minimal working example of a much larger project which cannot use the above fixes.

Some things to take notice of:

  • While the rdd data is not used in the function, the line is required to reproduce the leak. In the real world project, the rdd data is used.
  • The memory leak is likely due to the large Numpy array rand_data not being released
  • You have to do the int operation on each element of rand_data to reproduce the leak 🤷

Question

Can you force the PySpark executor to release the memory of rand_data by inserting code in the first few lines or last few lines of the toy_example function?

What has already been attempted

Force garbage collection by inserting at the end of the function:

del data, rand_data
import gc
gc.collect()

Force memory release by inserting at the end or beginning of the function (inspired by a Pandas issue):

from ctypes import cdll, CDLL
cdll.LoadLibrary("libc.so.6")
libc = CDLL("libc.so.6")
libc.malloc_trim(0)

Setup, measurement and versions

The following PySpark job was run on a AWS EMR cluster with one m4.xlarge worker node. Numpy had to be pip installed on the worker node via bootstrapping.

The memory of the executor was measured using the following function (printed to the executor's log):

import resource
resource.getrusage(resource.RUSAGE_SELF).ru_maxrss

Spark submit config:

  • spark.executor.instances = 1
  • spark.executor.cores = 1
  • spark.executor.memory = 6g
  • spark.master = yarn
  • spark.dynamicAllocation.enabled = false

Versions:

  • EMR 5.12.1
  • Spark 2.2.1
  • Python 2.7.13
  • Numpy 1.14.0
like image 433
joshlk Avatar asked Nov 01 '18 16:11

joshlk


2 Answers

We recently ran into a very similar issue and we also could not force a memory release by changing code. What worked for us, however, was using the following Spark option: spark.python.worker.reuse = False

like image 161
paul Avatar answered Nov 10 '22 09:11

paul


I had a similar problem in a project where several parameters to be inserted in the database were being saved in a list. That list was created inside a loop, but we saw that even when the loop ended, part of the memory of the list was not released. In fact, it is a recurring problem (with different types of data) that has been discussed in several places (source 1, source 2, source 3, source 4...). The solution was then to create a process and perform there the creation of the list since at the end of the process that memory is released directly by the operating system without Python being able to do something (bad) about it. Another solution is the one commented by @paul, setting the spark.python.worker.reuse option to false does something similar, but at a more internal level in Spark. Below I did a quick benchmark with both approaches, apparently, the first solution is faster. It would be necessary to test it in a real environment with large data transfer. At least we have one more approach to try and fix the problem.

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
import numpy as np
import time
import resource
from multiprocessing import Process, Queue
import timeit


def process_data(q: Queue, rdd):
    # Read in pySpark DataFrame partition
    data = list(rdd)

    # Generate random data using Numpy
    rand_data = np.random.random(int(1e7))

    # Apply the `int` function to each element of `rand_data`
    for i in range(len(rand_data)):
        e = rand_data[i]
        int(e)

    # Return a single `0` value
    q.put([[0]])


def toy_example_with_process(rdd):
    # `used_memory` size should not be increased on every call to toy_example as
    # the previous call memory should be released
    used_memory = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss

    q = Queue()
    p = Process(target=process_data, args=(q, rdd))
    p.start()
    _process_result = q.get()
    p.join()

    return [[used_memory]]


def toy_example(rdd):
    # `used_memory` size should not be increased on every call to toy_example as
    # the previous call memory should be released
    used_memory = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
    # Read in pySpark DataFrame partition
    data = list(rdd)

    # Generate random data using Numpy
    rand_data = np.random.random(int(1e7))

    # Apply the `int` function to each element of `rand_data`
    for i in range(len(rand_data)):
        e = rand_data[i]
        int(e)

    return [[used_memory]]


def worker_reuse_false(df):
    """Allocations are in the mapPartitions function but the `spark.python.worker.reuse` is set to 'false'
    and prevents memory leaks"""
    memory_usage = df.rdd.mapPartitions(toy_example).collect()
    print(memory_usage)  # Just for debugging, remove


def with_process(df):
    """Allocations are inside a new Process. Memory is released by the OS"""
    memory_usage = df.rdd.mapPartitions(toy_example_with_process).collect()
    print(memory_usage)  # Just for debugging, remove


iterations = 10

# Timeit with `spark.python.worker.reuse` = 'false'
conf = SparkConf().setMaster("spark://master-node:7077").setAppName(f"Memory leak reuse false {time.time()}")
conf = conf.set("spark.python.worker.reuse", 'false')
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
df = sqlContext.range(0, int(1e5), numPartitions=16)
worker_reuse_time = timeit.timeit(lambda: worker_reuse_false(df), number=iterations)
print(f'Worker reuse: {round(worker_reuse_time, 3)} seconds')


# Timeit with external Process
sc.stop()  # Needed to set a new SparkContext config
conf = SparkConf().setMaster("spark://master-node:7077").setAppName(f"Memory leak with process {time.time()}")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
df = sqlContext.range(0, int(1e5), numPartitions=16)
with_process_time = timeit.timeit(lambda: with_process(df), number=iterations)
print(f'With process: {round(with_process_time, 3)} seconds')
like image 1
Genarito Avatar answered Nov 10 '22 09:11

Genarito