Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Memory Error with Multiprocessing in Python

I'm trying to perform some costly scientific calculation with Python. I have to read a bunch of data stored in csv files and process then. Since each process take a long time and I have some 8 processors to use, I was trying to use the Pool method from Multiprocessing.

This is how I structured the multiprocessing call:

    pool = Pool()
    vector_components = []
    for sample in range(samples):
        vector_field_x_i = vector_field_samples_x[sample]
        vector_field_y_i = vector_field_samples_y[sample]
        vector_component = pool.apply_async(vector_field_decomposer, args=(x_dim, y_dim, x_steps, y_steps,
                                                                           vector_field_x_i, vector_field_y_i))
        vector_components.append(vector_component)
    pool.close()
    pool.join()

    vector_components = map(lambda k: k.get(), vector_components)

    for vector_component in vector_components:
        CsvH.write_vector_field(vector_component, '../CSV/RotationalFree/rotational_free_x_'+str(sample)+'.csv')

I was running a data set of 500 samples of size equal to 100 (x_dim) by 100 (y_dim).

Until then everything worked fine.

Then I receive a data set of 500 samples of 400 x 400.

When running it, I get an error when calling the get.

I also tried to run a single sample of 400 x 400 and got the same error.

Traceback (most recent call last):
  File "__init__.py", line 33, in <module>
    VfD.samples_vector_field_decomposer(samples, x_dim, y_dim, x_steps, y_steps, vector_field_samples_x, vector_field_samples_y)
  File "/export/home/pceccon/VectorFieldDecomposer/Sources/Controllers/VectorFieldDecomposerController.py", line 43, in samples_vector_field_decomposer
    vector_components = map(lambda k: k.get(), vector_components)
  File "/export/home/pceccon/VectorFieldDecomposer/Sources/Controllers/VectorFieldDecomposerController.py", line 43, in <lambda>
    vector_components = map(lambda k: k.get(), vector_components)
  File "/export/home/pceccon/.pyenv/versions/2.7.5/lib/python2.7/multiprocessing/pool.py", line 554, in get
    raise self._value
MemoryError

What should I do?

Thank you in advance.

like image 759
pceccon Avatar asked Aug 21 '14 15:08

pceccon


1 Answers

Right now you're keeping several lists in memory - vector_field_x, vector_field_y, vector_components, and then a separate copy of vector_components during the map call (which is when you actually run out of memory). You can avoid needing either copy of the vector_components list by using pool.imap, instead of pool.apply_async along with a manually created list. imap returns an iterator instead of a complete list, so you never have all the results in memory.

Normally, pool.map breaks the iterable passed to it into chunks, and sends the those chunks to the child processes, rather than sending one element at a time. This helps improve performance. Because imap uses an iterator instead of a list, it doesn't know the complete size of the iterable you're passing to it. Without knowing the size of the iterable, it doesn't know how big to make each chunk, so it defaults to a chunksize of 1, which will work, but may not perform all that well. To avoid this, you can provide it with a good chunksize argument, since you know the iterable is sample elements long. It may not make much difference for your 500 element list, but it's worth experimenting with.

Here's some sample code that demonstrates all this:

import multiprocessing
from functools import partial


def vector_field_decomposer(x_dim, y_dim, x_steps, y_steps, vector_fields):
    vector_field_x_i = vector_fields[0]
    vector_field_y_i = vector_fields[1]
    # Do whatever is normally done here.


if __name__ == "__main__":
    num_workers = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(num_workers)
    # Calculate a good chunksize (based on implementation of pool.map)
    chunksize, extra = divmod(samples // 4 * num_workers)
    if extra:
        chunksize += 1

    # Use partial so many arguments can be passed to vector_field_decomposer
    func = partial(vector_field_decomposer, x_dim, y_dim, x_steps, y_steps)
    # We use a generator expression as an iterable, so we don't create a full list.
    results = pool.imap(func, 
                        ((vector_field_samples_x[s], vector_field_samples_y[s]) for s in xrange(samples)),
                        chunksize=chunksize)
    for vector in results:
        CsvH.write_vector_field(vector_component, 
                                '../CSV/RotationalFree/rotational_free_x_'+str(sample)+'.csv')
    pool.close()
    pool.join()

This should allow you to avoid the MemoryError issues, but if not, you could try running imap over smaller chunks of your total sample, and just do multiple passes. I don't think you'll have any issues though, because you're not building any additional lists, other than the vector_field_* lists you start with.

like image 73
dano Avatar answered Sep 18 '22 15:09

dano