I'm trying to perform some costly scientific calculation with Python. I have to read a bunch of data stored in csv files and process then. Since each process take a long time and I have some 8 processors to use, I was trying to use the Pool
method from Multiprocessing
.
This is how I structured the multiprocessing call:
pool = Pool()
vector_components = []
for sample in range(samples):
vector_field_x_i = vector_field_samples_x[sample]
vector_field_y_i = vector_field_samples_y[sample]
vector_component = pool.apply_async(vector_field_decomposer, args=(x_dim, y_dim, x_steps, y_steps,
vector_field_x_i, vector_field_y_i))
vector_components.append(vector_component)
pool.close()
pool.join()
vector_components = map(lambda k: k.get(), vector_components)
for vector_component in vector_components:
CsvH.write_vector_field(vector_component, '../CSV/RotationalFree/rotational_free_x_'+str(sample)+'.csv')
I was running a data set of 500 samples of size equal to 100 (x_dim) by 100 (y_dim).
Until then everything worked fine.
Then I receive a data set of 500 samples of 400 x 400.
When running it, I get an error when calling the get
.
I also tried to run a single sample of 400 x 400 and got the same error.
Traceback (most recent call last):
File "__init__.py", line 33, in <module>
VfD.samples_vector_field_decomposer(samples, x_dim, y_dim, x_steps, y_steps, vector_field_samples_x, vector_field_samples_y)
File "/export/home/pceccon/VectorFieldDecomposer/Sources/Controllers/VectorFieldDecomposerController.py", line 43, in samples_vector_field_decomposer
vector_components = map(lambda k: k.get(), vector_components)
File "/export/home/pceccon/VectorFieldDecomposer/Sources/Controllers/VectorFieldDecomposerController.py", line 43, in <lambda>
vector_components = map(lambda k: k.get(), vector_components)
File "/export/home/pceccon/.pyenv/versions/2.7.5/lib/python2.7/multiprocessing/pool.py", line 554, in get
raise self._value
MemoryError
What should I do?
Thank you in advance.
Right now you're keeping several lists in memory - vector_field_x
, vector_field_y
, vector_components
, and then a separate copy of vector_components
during the map
call (which is when you actually run out of memory). You can avoid needing either copy of the vector_components
list by using pool.imap
, instead of pool.apply_async
along with a manually created list. imap
returns an iterator instead of a complete list, so you never have all the results in memory.
Normally, pool.map
breaks the iterable passed to it into chunks, and sends the those chunks to the child processes, rather than sending one element at a time. This helps improve performance. Because imap
uses an iterator instead of a list, it doesn't know the complete size of the iterable you're passing to it. Without knowing the size of the iterable, it doesn't know how big to make each chunk, so it defaults to a chunksize
of 1, which will work, but may not perform all that well. To avoid this, you can provide it with a good chunksize
argument, since you know the iterable is sample
elements long. It may not make much difference for your 500 element list, but it's worth experimenting with.
Here's some sample code that demonstrates all this:
import multiprocessing
from functools import partial
def vector_field_decomposer(x_dim, y_dim, x_steps, y_steps, vector_fields):
vector_field_x_i = vector_fields[0]
vector_field_y_i = vector_fields[1]
# Do whatever is normally done here.
if __name__ == "__main__":
num_workers = multiprocessing.cpu_count()
pool = multiprocessing.Pool(num_workers)
# Calculate a good chunksize (based on implementation of pool.map)
chunksize, extra = divmod(samples // 4 * num_workers)
if extra:
chunksize += 1
# Use partial so many arguments can be passed to vector_field_decomposer
func = partial(vector_field_decomposer, x_dim, y_dim, x_steps, y_steps)
# We use a generator expression as an iterable, so we don't create a full list.
results = pool.imap(func,
((vector_field_samples_x[s], vector_field_samples_y[s]) for s in xrange(samples)),
chunksize=chunksize)
for vector in results:
CsvH.write_vector_field(vector_component,
'../CSV/RotationalFree/rotational_free_x_'+str(sample)+'.csv')
pool.close()
pool.join()
This should allow you to avoid the MemoryError
issues, but if not, you could try running imap
over smaller chunks of your total sample, and just do multiple passes. I don't think you'll have any issues though, because you're not building any additional lists, other than the vector_field_*
lists you start with.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With