Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pandas and Multiprocessing Memory Management: Splitting a DataFrame into Multiple Chunks

I have to process a huge pandas.DataFrame (several tens of GB) on a row by row bases, where each row operation is quite lengthy (a couple of tens of milliseconds). So I had the idea to split up the frame into chunks and process each chunk in parallel using multiprocessing. This does speed-up the task, but the memory consumption is a nightmare.

Although each child process should in principle only consume a tiny chunk of the data, it needs (almost) as much memory as the original parent process that contained the original DataFrame. Even deleting the used parts in the parent process does not help.

I wrote a minimal example that replicates this behavior. The only thing it does is creating a large DataFrame with random numbers, chunk it into little pieces with at most 100 rows, and simply print some information about the DataFrame during multiprocessing (here via a mp.Pool of size 4).

The main function that is executed in parallel:

def just_wait_and_print_len_and_idx(df):
    """Waits for 5 seconds and prints df length and first and last index"""
    # Extract some info
    idx_values = df.index.values
    first_idx, last_idx = idx_values[0], idx_values[-1]
    length = len(df)
    pid = os.getpid()

    # Waste some CPU cycles
    time.sleep(1)

    # Print the info
    print('First idx {}, last idx {} and len {} '
          'from process {}'.format(first_idx, last_idx, length, pid))

The helper generator to chunk a DataFrame into little pieces:

def df_chunking(df, chunksize):
    """Splits df into chunks, drops data of original df inplace"""
    count = 0 # Counter for chunks
    while len(df):
        count += 1
        print('Preparing chunk {}'.format(count))
        # Return df chunk
        yield df.iloc[:chunksize].copy()
        # Delete data in place because it is no longer needed
        df.drop(df.index[:chunksize], inplace=True)

And the main routine:

def main():
    # Job parameters
    n_jobs = 4  # Poolsize
    size = (10000, 1000)  # Size of DataFrame
    chunksize = 100  # Maximum size of Frame Chunk

    # Preparation
    df = pd.DataFrame(np.random.rand(*size))
    pool = mp.Pool(n_jobs)

    print('Starting MP')

    # Execute the wait and print function in parallel
    pool.imap(just_wait_and_print_len_and_idx, df_chunking(df, chunksize))

    pool.close()
    pool.join()

    print('DONE')

The standard output looks like this:

Starting MP
Preparing chunk 1
Preparing chunk 2
First idx 0, last idx 99 and len 100 from process 9913
First idx 100, last idx 199 and len 100 from process 9914
Preparing chunk 3
First idx 200, last idx 299 and len 100 from process 9915
Preparing chunk 4
...
DONE

The Problem:

The main process needs about 120MB of memory. However, the child processes of the pool need the same amount of memory, although they only contain 1% of the original DataFame (chunks of size 100 vs original length of 10000). Why?

What can I do about it? Does Python (3) send the whole DataFrame to each child process despite my chunking? Is that a problem of pandas memory management or the fault of multiprocessing and data pickling? Thanks!



Whole script for simple copy and paste in case you want to try it yourself:

import multiprocessing as mp
import pandas as pd
import numpy as np
import time
import os


def just_wait_and_print_len_and_idx(df):
    """Waits for 5 seconds and prints df length and first and last index"""
    # Extract some info
    idx_values = df.index.values
    first_idx, last_idx = idx_values[0], idx_values[-1]
    length = len(df)
    pid = os.getpid()

    # Waste some CPU cycles
    time.sleep(1)

    # Print the info
    print('First idx {}, last idx {} and len {} '
          'from process {}'.format(first_idx, last_idx, length, pid))


def df_chunking(df, chunksize):
    """Splits df into chunks, drops data of original df inplace"""
    count = 0 # Counter for chunks
    while len(df):
        count += 1
        print('Preparing chunk {}'.format(count))
        # Return df chunk
        yield df.iloc[:chunksize].copy()
        # Delete data in place because it is no longer needed
        df.drop(df.index[:chunksize], inplace=True)


def main():
    # Job parameters
    n_jobs = 4  # Poolsize
    size = (10000, 1000)  # Size of DataFrame
    chunksize = 100  # Maximum size of Frame Chunk

    # Preparation
    df = pd.DataFrame(np.random.rand(*size))
    pool = mp.Pool(n_jobs)

    print('Starting MP')

    # Execute the wait and print function in parallel
    pool.imap(just_wait_and_print_len_and_idx, df_chunking(df, chunksize))

    pool.close()
    pool.join()

    print('DONE')


if __name__ == '__main__':
    main()
like image 375
SmCaterpillar Avatar asked Dec 20 '16 10:12

SmCaterpillar


People also ask

How do you split a Dataframe into multiple data frames?

Here, we use the DataFrame. groupby() method for splitting the dataset by rows. The same grouped rows are taken as a single element and stored in a list. This list is the required output which consists of small DataFrames.

What is chunking in pandas?

Technically the number of rows read at a time in a file by pandas is referred to as chunksize. Suppose If the chunksize is 100 then pandas will load the first 100 rows. The object returned is not a data frame but a TextFileReader which needs to be iterated to get the data.

Can you split Dataframe?

Sometimes in order to analyze the Dataframe more accurately, we need to split it into 2 or more parts. The Pandas provide the feature to split Dataframe according to column index, row index, and column values, etc.


1 Answers

Ok, so I figured it out after the hint by Sebastian Opałczyński in the comments.

The problem is that the child processes are forked from the parent, so all of them contain a reference to the original DataFrame. However, the frame is manipulated in the original process, so the copy-on-write behavior kills the whole thing slowly and eventually when the limit of the physical memory is reached.

There is a simple solution: Instead of pool = mp.Pool(n_jobs), I use the new context feature of multiprocessing:

ctx = mp.get_context('spawn')
pool = ctx.Pool(n_jobs)

This guarantees that the Pool processes are just spawned and not forked from the parent process. Accordingly, none of them has access to the original DataFrame and all of them only need a tiny fraction of the parent's memory.

Note that the mp.get_context('spawn') is only available in Python 3.4 and newer.

like image 114
SmCaterpillar Avatar answered Sep 30 '22 16:09

SmCaterpillar