I have written the program (below) to:
pandas dataframe
groupby
using a specific column value to split the data and store as list of dataframes.multiprocess Pool.map()
to process each dataframe in parallel.Everything is fine, the program works well on my small test dataset. But, when I pipe in my large data (about 14 GB), the memory consumption exponentially increases and then freezes the computer or gets killed (in HPC cluster).
I have added codes to clear the memory as soon as the data/variable isn't useful. I am also closing the pool as soon as it is done. Still with 14 GB input I was only expecting 2*14 GB memory burden, but it seems like lot is going on. I also tried to tweak using chunkSize and maxTaskPerChild, etc
but I am not seeing any difference in optimization in both test vs. large file.
I think improvements to this code is/are required at this code position, when I start multiprocessing
.
p = Pool(3) # number of pool to run at once; default at 1 result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))
but, I am posting the whole code.
Test example: I created a test file ("genome_matrix_final-chr1234-1mb.txt") of upto 250 mb and ran the program. When I check the system monitor I can see that memory consumption increased by about 6 GB. I am not so clear why so much memory space is taken by 250 mb file plus some outputs. I have shared that file via drop box if it helps in seeing the real problem. https://www.dropbox.com/sh/coihujii38t5prd/AABDXv8ACGIYczeMtzKBo0eea?dl=0
Can someone suggest, How I can get rid of the problem?
My python script:
#!/home/bin/python3 import pandas as pd import collections from multiprocessing import Pool import io import time import resource print() print('Checking required modules') print() ''' change this input file name and/or path as need be ''' genome_matrix_file = "genome_matrix_final-chr1n2-2mb.txt" # test file 01 genome_matrix_file = "genome_matrix_final-chr1234-1mb.txt" # test file 02 #genome_matrix_file = "genome_matrix_final.txt" # large file def main(): with open("genome_matrix_header.txt") as header: header = header.read().rstrip('\n').split('\t') print() time01 = time.time() print('starting time: ', time01) '''load the genome matrix file onto pandas as dataframe. This makes is more easy for multiprocessing''' gen_matrix_df = pd.read_csv(genome_matrix_file, sep='\t', names=header) # now, group the dataframe by chromosome/contig - so it can be multiprocessed gen_matrix_df = gen_matrix_df.groupby('CHROM') # store the splitted dataframes as list of key, values(pandas dataframe) pairs # this list of dataframe will be used while multiprocessing gen_matrix_df_list = collections.OrderedDict() for chr_, data in gen_matrix_df: gen_matrix_df_list[chr_] = data # clear memory del gen_matrix_df '''Now, pipe each dataframe from the list using map.Pool() ''' p = Pool(3) # number of pool to run at once; default at 1 result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values())) del gen_matrix_df_list # clear memory p.close() p.join() # concat the results from pool.map() and write it to a file result_merged = pd.concat(result) del result # clear memory pd.DataFrame.to_csv(result_merged, "matrix_to_haplotype-chr1n2.txt", sep='\t', header=True, index=False) print() print('completed all process in "%s" sec. ' % (time.time() - time01)) print('Global maximum memory usage: %.2f (mb)' % current_mem_usage()) print() '''function to convert the dataframe from genome matrix to desired output ''' def matrix_to_vcf(matrix_df): print() time02 = time.time() # index position of the samples in genome matrix file sample_idx = [{'10a': 33, '10b': 18}, {'13a': 3, '13b': 19}, {'14a': 20, '14b': 4}, {'16a': 5, '16b': 21}, {'17a': 6, '17b': 22}, {'23a': 7, '23b': 23}, {'24a': 8, '24b': 24}, {'25a': 25, '25b': 9}, {'26a': 10, '26b': 26}, {'34a': 11, '34b': 27}, {'35a': 12, '35b': 28}, {'37a': 13, '37b': 29}, {'38a': 14, '38b': 30}, {'3a': 31, '3b': 15}, {'8a': 32, '8b': 17}] # sample index stored as ordered dictionary sample_idx_ord_list = [] for ids in sample_idx: ids = collections.OrderedDict(sorted(ids.items())) sample_idx_ord_list.append(ids) # for haplotype file header = ['contig', 'pos', 'ref', 'alt'] # adding some suffixes "PI" to available sample names for item in sample_idx_ord_list: ks_update = '' for ks in item.keys(): ks_update += ks header.append(ks_update+'_PI') header.append(ks_update+'_PG_al') #final variable store the haplotype data # write the header lines first haplotype_output = '\t'.join(header) + '\n' # to store the value of parsed the line and update the "PI", "PG" value for each sample updated_line = '' # read the piped in data back to text like file matrix_df = pd.DataFrame.to_csv(matrix_df, sep='\t', index=False) matrix_df = matrix_df.rstrip('\n').split('\n') for line in matrix_df: if line.startswith('CHROM'): continue line_split = line.split('\t') chr_ = line_split[0] ref = line_split[2] alt = list(set(line_split[3:])) # remove the alleles "N" missing and "ref" from the alt-alleles alt_up = list(filter(lambda x: x!='N' and x!=ref, alt)) # if no alt alleles are found, just continue # - i.e : don't write that line in output file if len(alt_up) == 0: continue #print('\nMining data for chromosome/contig "%s" ' %(chr_ )) #so, we have data for CHR, POS, REF, ALT so far # now, we mine phased genotype for each sample pair (as "PG_al", and also add "PI" tag) sample_data_for_vcf = [] for ids in sample_idx_ord_list: sample_data = [] for key, val in ids.items(): sample_value = line_split[val] sample_data.append(sample_value) # now, update the phased state for each sample # also replacing the missing allele i.e "N" and "-" with ref-allele sample_data = ('|'.join(sample_data)).replace('N', ref).replace('-', ref) sample_data_for_vcf.append(str(chr_)) sample_data_for_vcf.append(sample_data) # add data for all the samples in that line, append it with former columns (chrom, pos ..) .. # and .. write it to final haplotype file sample_data_for_vcf = '\t'.join(sample_data_for_vcf) updated_line = '\t'.join(line_split[0:3]) + '\t' + ','.join(alt_up) + \ '\t' + sample_data_for_vcf + '\n' haplotype_output += updated_line del matrix_df # clear memory print('completed haplotype preparation for chromosome/contig "%s" ' 'in "%s" sec. ' %(chr_, time.time()-time02)) print('\tWorker maximum memory usage: %.2f (mb)' %(current_mem_usage())) # return the data back to the pool return pd.read_csv(io.StringIO(haplotype_output), sep='\t') ''' to monitor memory ''' def current_mem_usage(): return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024. if __name__ == '__main__': main()
Update for bounty hunters:
I have achieved multiprocessing using Pool.map()
but the code is causing a big memory burden (input test file ~ 300 mb, but memory burden is about 6 GB). I was only expecting 3*300 mb memory burden at max.
Pool allows multiple jobs per process, which may make it easier to parallel your program. If you have a numbers jobs to run in parallel, you can make a Pool with number of processes the same number of as CPU cores and after that pass the list of the numbers jobs to pool. map.
The easiest way to profile a single method or function is the open source memory-profiler package. It's similar to line_profiler , which I've written about before . You can use it by putting the @profile decorator around any function or method and running python -m memory_profiler myscript.
The effect is less overhead in transmitting tasks to worker processes and collecting results. This can be achieved via the “chunksize” argument to map(). This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks.
Python multiprocessing Pool can be used for parallel execution of a function across multiple input values, distributing the input data across processes (data parallelism).
In Python (in the following I use 64-bit build of Python 3.6.5) everything is an object. This has its overhead and with getsizeof
we can see exactly the size of an object in bytes:
>>> import sys >>> sys.getsizeof(42) 28 >>> sys.getsizeof('T') 50
multiprocessing.get_start_method()
) to create a child process, parent's physical memory is not copied and copy-on-write technique is used.
- Process A has 50 KiB of unshared memory
- Process B has 300 KiB of unshared memory
- Both process A and process B have 100 KiB of the same shared memory region
Since the PSS is defined as the sum of the unshared memory of a process and the proportion of memory shared with other processes, the PSS for these two processes are as follows:
- PSS of process A = 50 KiB + (100 KiB / 2) = 100 KiB
- PSS of process B = 300 KiB + (100 KiB / 2) = 350 KiB
Not let's look at your DataFrame
alone. memory_profiler
will help us.
justpd.py
#!/usr/bin/env python3 import pandas as pd from memory_profiler import profile @profile def main(): with open('genome_matrix_header.txt') as header: header = header.read().rstrip('\n').split('\t') gen_matrix_df = pd.read_csv( 'genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header) gen_matrix_df.info() gen_matrix_df.info(memory_usage='deep') if __name__ == '__main__': main()
Now let's use the profiler:
mprof run justpd.py mprof plot
We can see the plot:
and line-by-line trace:
Line # Mem usage Increment Line Contents ================================================ 6 54.3 MiB 54.3 MiB @profile 7 def main(): 8 54.3 MiB 0.0 MiB with open('genome_matrix_header.txt') as header: 9 54.3 MiB 0.0 MiB header = header.read().rstrip('\n').split('\t') 10 11 2072.0 MiB 2017.7 MiB gen_matrix_df = pd.read_csv('genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header) 12 13 2072.0 MiB 0.0 MiB gen_matrix_df.info() 14 2072.0 MiB 0.0 MiB gen_matrix_df.info(memory_usage='deep')
We can see that the data frame takes ~2 GiB with peak at ~3 GiB while it's being built. What's more interesting is the output of info
.
<class 'pandas.core.frame.DataFrame'> RangeIndex: 4000000 entries, 0 to 3999999 Data columns (total 34 columns): ... dtypes: int64(2), object(32) memory usage: 1.0+ GB
But info(memory_usage='deep')
("deep" means introspection of the data deeply by interrogating object
dtype
s, see below) gives:
memory usage: 7.9 GB
Huh?! Looking outside of the process we can make sure that memory_profiler
's figures are correct. sys.getsizeof
also shows the same value for the frame (most probably because of custom __sizeof__
) and so will other tools that use it to estimate allocated gc.get_objects()
, e.g. pympler
.
# added after read_csv from pympler import tracker tr = tracker.SummaryTracker() tr.print_diff()
Gives:
types | # objects | total size ================================================== | =========== | ============ <class 'pandas.core.series.Series | 34 | 7.93 GB <class 'list | 7839 | 732.38 KB <class 'str | 7741 | 550.10 KB <class 'int | 1810 | 49.66 KB <class 'dict | 38 | 7.43 KB <class 'pandas.core.internals.SingleBlockManager | 34 | 3.98 KB <class 'numpy.ndarray | 34 | 3.19 KB
So where do these 7.93 GiB come from? Let's try to explain this. We have 4M rows and 34 columns, which gives us 134M values. They are either int64
or object
(which is a 64-bit pointer; see using pandas with large data for detailed explanation). Thus we have 134 * 10 ** 6 * 8 / 2 ** 20
~1022 MiB only for values in the data frame. What about the remaining ~ 6.93 GiB?
To understand the behaviour it's necessary to know that Python does string interning. There are two good articles (one, two) about string interning in Python 2. Besides the Unicode change in Python 3 and PEP 393 in Python 3.3 the C-structures have changed, but the idea is the same. Basically, every short string that looks like an identifier will be cached by Python in an internal dictionary and references will point to the same Python objects. In other word we can say it behaves like a singleton. Articles that I mentioned above explain what significant memory profile and performance improvements it gives. We can check if a string is interned using interned
field of PyASCIIObject
:
import ctypes class PyASCIIObject(ctypes.Structure): _fields_ = [ ('ob_refcnt', ctypes.c_size_t), ('ob_type', ctypes.py_object), ('length', ctypes.c_ssize_t), ('hash', ctypes.c_int64), ('state', ctypes.c_int32), ('wstr', ctypes.c_wchar_p) ]
Then:
>>> a = 'name' >>> b = '!@#$' >>> a_struct = PyASCIIObject.from_address(id(a)) >>> a_struct.state & 0b11 1 >>> b_struct = PyASCIIObject.from_address(id(b)) >>> b_struct.state & 0b11 0
With two strings we can also do identity comparison (addressed in memory comparison in case of CPython).
>>> a = 'foo' >>> b = 'foo' >>> a is b True >> gen_matrix_df.REF[0] is gen_matrix_df.REF[6] True
Because of that fact, in regard to object
dtype
, the data frame allocates at most 20 strings (one per amino acids). Though, it's worth noting that Pandas recommends categorical types for enumerations.
Thus we can explain the naive estimate of 7.93 GiB like:
>>> rows = 4 * 10 ** 6 >>> int_cols = 2 >>> str_cols = 32 >>> int_size = 8 >>> str_size = 58 >>> ptr_size = 8 >>> (int_cols * int_size + str_cols * (str_size + ptr_size)) * rows / 2 ** 30 7.927417755126953
Note that str_size
is 58 bytes, not 50 as we've seen above for 1-character literal. It's because PEP 393 defines compact and non-compact strings. You can check it with sys.getsizeof(gen_matrix_df.REF[0])
.
Actual memory consumption should be ~1 GiB as it's reported by gen_matrix_df.info()
, it's twice as much. We can assume it has something to do with memory (pre)allocation done by Pandas or NumPy. The following experiment shows that it's not without reason (multiple runs show the save picture):
Line # Mem usage Increment Line Contents ================================================ 8 53.1 MiB 53.1 MiB @profile 9 def main(): 10 53.1 MiB 0.0 MiB with open("genome_matrix_header.txt") as header: 11 53.1 MiB 0.0 MiB header = header.read().rstrip('\n').split('\t') 12 13 2070.9 MiB 2017.8 MiB gen_matrix_df = pd.read_csv('genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header) 14 2071.2 MiB 0.4 MiB gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]]) 15 2071.2 MiB 0.0 MiB gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]]) 16 2040.7 MiB -30.5 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) ... 23 1827.1 MiB -30.5 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 24 1094.7 MiB -732.4 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 25 1765.9 MiB 671.3 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 26 1094.7 MiB -671.3 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 27 1704.8 MiB 610.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 28 1094.7 MiB -610.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 29 1643.9 MiB 549.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 30 1094.7 MiB -549.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 31 1582.8 MiB 488.1 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 32 1094.7 MiB -488.1 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 33 1521.9 MiB 427.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 34 1094.7 MiB -427.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 35 1460.8 MiB 366.1 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 36 1094.7 MiB -366.1 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 37 1094.7 MiB 0.0 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) ... 47 1094.7 MiB 0.0 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
I want to finish this section by a quote from fresh article about design issues and future Pandas2 by original author of Pandas.
pandas rule of thumb: have 5 to 10 times as much RAM as the size of your dataset
Let's come to the pool, finally, and see if can make use of copy-on-write. We'll use smemstat
(available form an Ubuntu repository) to estimate process group memory sharing and glances
to write down system-wide free memory. Both can write JSON.
We'll run original script with Pool(2)
. We'll need 3 terminal windows.
smemstat -l -m -p "python3.6 script.py" -o smemstat.json 1
glances -t 1 --export-json glances.json
mprof run -M script.py
Then mprof plot
produces:
The sum chart (mprof run --nopython --include-children ./script.py
) looks like:
Note that two charts above show RSS. The hypothesis is that because of copy-on-write it's doesn't reflect actual memory usage. Now we have two JSON files from smemstat
and glances
. I'll the following script to covert the JSON files to CSV.
#!/usr/bin/env python3 import csv import sys import json def smemstat(): with open('smemstat.json') as f: smem = json.load(f) rows = [] fieldnames = set() for s in smem['smemstat']['periodic-samples']: row = {} for ps in s['smem-per-process']: if 'script.py' in ps['command']: for k in ('uss', 'pss', 'rss'): row['{}-{}'.format(ps['pid'], k)] = ps[k] // 2 ** 20 # smemstat produces empty samples, backfill from previous if rows: for k, v in rows[-1].items(): row.setdefault(k, v) rows.append(row) fieldnames.update(row.keys()) with open('smemstat.csv', 'w') as out: dw = csv.DictWriter(out, fieldnames=sorted(fieldnames)) dw.writeheader() list(map(dw.writerow, rows)) def glances(): rows = [] fieldnames = ['available', 'used', 'cached', 'mem_careful', 'percent', 'free', 'mem_critical', 'inactive', 'shared', 'history_size', 'mem_warning', 'total', 'active', 'buffers'] with open('glances.csv', 'w') as out: dw = csv.DictWriter(out, fieldnames=fieldnames) dw.writeheader() with open('glances.json') as f: for l in f: d = json.loads(l) dw.writerow(d['mem']) if __name__ == '__main__': globals()[sys.argv[1]]()
First let's look at free
memory.
The difference between first and minimum is ~4.15 GiB. And here is how PSS figures look like:
And the sum:
Thus we can see that because of copy-on-write actual memory consumption is ~4.15 GiB. But we're still serialising data to send it to worker processes via Pool.map
. Can we leverage copy-on-write here as well?
To use copy-on-write we need to have the list(gen_matrix_df_list.values())
be accessible globally so the worker after fork can still read it.
Let's modify code after del gen_matrix_df
in main
like the following:
... global global_gen_matrix_df_values global_gen_matrix_df_values = list(gen_matrix_df_list.values()) del gen_matrix_df_list p = Pool(2) result = p.map(matrix_to_vcf, range(len(global_gen_matrix_df_values))) ...
del gen_matrix_df_list
that goes later.And modify first lines of matrix_to_vcf
like:
def matrix_to_vcf(i): matrix_df = global_gen_matrix_df_values[i]
Now let's re-run it. Free memory:
Process tree:
And its sum:
Thus we're at maximum of ~2.9 GiB of actual memory usage (the peak main process has while building the data frame) and copy-on-write has helped!
As a side note, there's so called copy-on-read, the behaviour of Python's reference cycle garbage collector, described in Instagram Engineering (which led to gc.freeze
in issue31558). But gc.disable()
doesn't have an impact in this particular case.
An alternative to copy-on-write copy-less data sharing can be delegating it to the kernel from the beginning by using numpy.memmap
. Here's an example implementation from High Performance Data Processing in Python talk. The tricky part is then to make Pandas to use the mmaped Numpy array.
When you use multiprocessing.Pool
a number of child processes will be created using the fork()
system call. Each of those processes start off with an exact copy of the memory of the parent process at that time. Because you're loading the csv before you create the Pool
of size 3, each of those 3 processes in the pool will unnecessarily have a copy of the data frame. (gen_matrix_df
as well as gen_matrix_df_list
will exist in the current process as well as in each of the 3 child processes, so 4 copies of each of these structures will be in memory)
Try creating the Pool
before loading the file (at the very beginning actually) That should reduce the memory usage.
If it's still too high, you can:
Dump gen_matrix_df_list to a file, 1 item per line, e.g:
import os import cPickle with open('tempfile.txt', 'w') as f: for item in gen_matrix_df_list.items(): cPickle.dump(item, f) f.write(os.linesep)
Use Pool.imap()
on an iterator over the lines that you dumped in this file, e.g.:
with open('tempfile.txt', 'r') as f: p.imap(matrix_to_vcf, (cPickle.loads(line) for line in f))
(Note that matrix_to_vcf
takes a (key, value)
tuple in the example above, not just a value)
I hope that helps.
NB: I haven't tested the code above. It's only meant to demonstrate the idea.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With