Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a good way to avoid memory deep copy or to reduce time spent in multiprocessing?

I am making a memory-based real-time calculation module of "Big data" using Pandas module of the Python environment.

So response time is the quality of this module and very critical and important.

To process large data set, I split the data and process sub split data in parallel.

In the part of storing the result of sub data, much time spend(21th line).

I think that internally memory deep copy arises or sub data passed are not shared in memory.

If I written the module in C or C++, I will use pointer or reference like below.

"process=Process(target=addNewDerivedColumn, args=[resultList, &sub_dataframe])"

or

"process=Process(target=addNewDerivedColumn, args=[resultList, sub_dataframe])

def addNewDerivedColumn(resultList, split_sub_dataframe&):.... "

Is there a good way to avoid memory deep copy or to reduce time spent in multiprocessing? "Not elegant" is fine. I am ready for making my codes dirty. I tried weekref, RawValue, RawArray, Value, Pool but all failed.

The module is being developed in MacOS and finally is going to run in Linux or Unix.

Do not consider Windows OS.

Here comes the code.

The real code is in my office but the structure and logic are the same as the real one.

1 #-*- coding: UTF-8 -*-' 
2 import pandas as pd
3 import numpy as np
4 from multiprocessing import *
5 import time
6
7
8 def addNewDerivedColumn(resultList, split_sub_dataframe):
9    
10    split_sub_dataframe['new_column']=    np.abs(split_sub_dataframe['column_01']+split_sub_dataframe['column_01']) / 2
11    
12    print split_sub_dataframe.head()
13    
14    '''
15     i think that the hole result of sub-dataframe is copied to resultList, not reference value 
16     and in here time spend much
17     compare elapsed time of comment 21th line with the uncommented one
18     In MS Windows, signifiant difference of elapsed time doesn't show up
19     In Linux or Mac OS, the difference is big
20    '''
21    resultList.append(split_sub_dataframe)
22    
23
24
25 if __name__ == "__main__":
26    
27    # example data generation
28    # the record count of the real data is over 1 billion with about 10 columns.
29    dataframe = pd.DataFrame(np.random.randn(100000000, 4), columns=['column_01', 'column_02', 'column_03', 'column_04'])
30    
31
32    print 'start...'
33    start_time = time.time()
34    
35    # to launch 5 process in parallel, I split the dataframe to five sub-dataframes
36    split_dataframe_list = np.array_split(dataframe, 5)
37    
38    # multiprocessing 
39    manager = Manager()
40    
41    # result list
42    resultList=manager.list()
43    processList=[]
44    
45    for sub_dataframe in split_dataframe_list:
46        process=Process(target=addNewDerivedColumn, args=[resultList, sub_dataframe])
47        processList.append(process)
48        
49    for proc in processList: 
50        proc.start()
51    for proc in processList: 
52        proc.join()
53    
54    
55    print 'elapsed time  : ', np.round(time.time() - start_time,3)
like image 324
Bruce Jung Avatar asked Oct 27 '13 07:10

Bruce Jung


2 Answers

You will get better performance if you keep interprocess communication to a minimum. Therefore, instead of passing sub-DataFrames as arguments, just pass index values. The subprocess can slice the common DataFrame itself.

When a subprocess is spawned, it gets a copy of all the globals defined in the calling module of the parent process. Thus, if the large DataFrame, df, is defined in the globals before you spawn a multiprocessing pool, then each spawned subprocess will have access to df.

On Windows, where there is no fork(), a new python process is started and the calling module is imported. Thus, on Windows, the spawned subprocess has to regenerate df from scratch, which could take time and much additional memory.

On Linux, however, you have copy-on-write. This means that the spawned subprocess accesses the original globals (of the calling module) without copying them. Only when the subprocess tries to modify the global does Linux then make a separate copy before the value is modified.

So you can enjoy a performance gain if you avoid modifying globals in your subprocesses. I suggest using the subprocess only for computation. Return the value of the computation, and let the main process collate the results to modify the original DataFrame.

import pandas as pd
import numpy as np
import multiprocessing as mp
import time

def compute(start, end):
    sub = df.iloc[start:end]
    return start, end, np.abs(sub['column_01']+sub['column_01']) / 2

def collate(retval):
    start, end, arr = retval
    df.ix[start:end, 'new_column'] = arr

def window(seq, n=2):
    """
    Returns a sliding window (of width n) over data from the sequence
    s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...
    """
    for i in range(len(seq)-n+1):
        yield tuple(seq[i:i+n])

if __name__ == "__main__":
    result = []
    # the record count of the real data is over 1 billion with about 10 columns.
    N = 10**3
    df = pd.DataFrame(np.random.randn(N, 4),
                      columns=['column_01', 'column_02', 'column_03', 'column_04'])

    pool = mp.Pool()    
    df['new_column'] = np.empty(N, dtype='float')

    start_time = time.time()
    idx = np.linspace(0, N, 5+1).astype('int')
    for start, end in window(idx, 2):
        # print(start, end)
        pool.apply_async(compute, args=[start, end], callback=collate)

    pool.close()
    pool.join()
    print 'elapsed time  : ', np.round(time.time() - start_time,3)
    print(df.head())
like image 181
unutbu Avatar answered Sep 21 '22 23:09

unutbu


Inspired by this question and @unutbu's answer, I wrote a parallel-version of map at github. The function is suitable for infinitely parallelizable processing of a read-only large data structure in a single machine with multiple cores. The basic idea is similar to @unutbu sugggested, using a temporary global variable to hold the big data structure (e.g., a data frame), and pass its "name" rather than the variable itself to workers. But all of this are encapsulated in a map function so that it is almost a drop-in replacement of the standard map function, with the help of pathos package. The example usage is as follows,

# Suppose we process a big dataframe with millions of rows.
size = 10**9
df = pd.DataFrame(np.random.randn(size, 4),
                  columns=['column_01', 'column_02', 
                           'column_03', 'column_04'])
# divide df into sections of 10000 rows; each section will be
# processed by one worker at a time
section_size = 10000
sections = [xrange(start, start+section_size) 
            for start in xrange(0, size, section_size)]

# The worker function that processes one section of the
# df. The key assumption is that a child 
# process does NOT modify the dataframe, but do some 
# analysis or aggregation and return some result.
def func(section, df):
    return some_processing(df.iloc[section])

num_cores = 4
# sections (local_args) specify which parts of a big object to be processed;
# global_arg holds the big object to be processed to avoid unnecessary copy;
# results are a list of objects each of which is the processing results 
# of one part of a big object (i.e., one element in the iterable sections) 
# in order.
results = map(func, sections, global_arg=df,
              chunksize=10, 
              processes=num_cores)

# reduce results (assume it is a list of data frames)
result = pd.concat(results)

In some of my text mining tasks, naive parallel implementation that passes df directly to the worker function is even slower than the single-threaded version, due to expensive copy operation of large data frame. However, the above implementation can give >3 times speedup for those tasks with 4 cores, which seems pretty close to real light-weight multi-threading.

like image 41
Fashandge Avatar answered Sep 25 '22 23:09

Fashandge