I just noticed that my program is using more and more memory as it processes a large file. It's only processing one line at a time, though, so I couldn't figure out why it would keep using more memory.
After a lot of digging, I realised that the program has three parts:
multiprocessing.Pool
using imap_unordered()
.If steps 1 and 2 are faster than step 3, then the results from the pool workers will queue up, consuming memory.
How can I throttle the data that I feed into the pool for step 2, so it doesn't get ahead of the consumer in step 3?
This looks similar to another multiprocessing question, but it's not clear to me where the delay is in that question.
Here's a small example that demonstrates the problem:
import logging
import os
import multiprocessing
from time import sleep
logging.basicConfig(level=logging.INFO,
format='%(asctime)s:%(process)d:%(thread)d:%(message)s')
logger = logging.getLogger()
def process_step1():
data = 'a' * 100000
for i in xrange(10000):
sleep(.001) # Faster than step 3.
yield data
if i % 1000 == 0:
logger.info('Producing %d.', i)
logger.info('Finished producing.')
def process_step2(data):
return data.upper()
def process_step3(up_data):
assert up_data == 'A' * 100000
sleep(.005) # Slower than step 1.
def main():
pool = multiprocessing.Pool(processes=10)
logger.info('Starting.')
loader = process_step1()
processed = pool.imap_unordered(process_step2, loader)
for i, up_data in enumerate(processed):
process_step3(up_data)
if i % 500 == 0:
logger.info('Consuming %d, using %0.1f MB.', i, get_memory())
logger.info('Done.')
def get_memory():
""" Look up the memory usage, return in MB. """
proc_file = '/proc/{}/status'.format(os.getpid())
scales = {'KB': 1024.0, 'MB': 1024.0 * 1024.0}
with open(proc_file, 'rU') as f:
for line in f:
if 'VmSize:' in line:
fields = line.split()
size = int(fields[1])
scale = fields[2].upper()
return size*scales[scale]/scales['MB']
return 0.0 # Unknown
main()
When that runs, I see a steady increase in memory use until step 1 finishes. If I let it run long enough after that, the memory use will start to decrease.
2016-12-01 15:37:50,859:6414:139712380557056:Starting.
2016-12-01 15:37:50,861:6414:139712266237696:Producing 0.
2016-12-01 15:37:50,868:6414:139712380557056:Consuming 0, using 255.0 MB.
2016-12-01 15:37:52,054:6414:139712266237696:Producing 1000.
2016-12-01 15:37:53,244:6414:139712266237696:Producing 2000.
2016-12-01 15:37:53,421:6414:139712380557056:Consuming 500, using 383.0 MB.
2016-12-01 15:37:54,446:6414:139712266237696:Producing 3000.
2016-12-01 15:37:55,635:6414:139712266237696:Producing 4000.
2016-12-01 15:37:55,976:6414:139712380557056:Consuming 1000, using 511.2 MB.
2016-12-01 15:37:56,831:6414:139712266237696:Producing 5000.
2016-12-01 15:37:58,019:6414:139712266237696:Producing 6000.
2016-12-01 15:37:58,529:6414:139712380557056:Consuming 1500, using 703.2 MB.
2016-12-01 15:37:59,209:6414:139712266237696:Producing 7000.
2016-12-01 15:38:00,406:6414:139712266237696:Producing 8000.
2016-12-01 15:38:01,084:6414:139712380557056:Consuming 2000, using 831.5 MB.
2016-12-01 15:38:01,602:6414:139712266237696:Producing 9000.
2016-12-01 15:38:02,802:6414:139712266237696:Finished producing.
2016-12-01 15:38:03,640:6414:139712380557056:Consuming 2500, using 959.5 MB.
2016-12-01 15:38:06,199:6414:139712380557056:Consuming 3000, using 959.5 MB.
It seems like Pool.imap_unordered()
launches a new thread to iterate over the input sequence generated by step 1, so we need to throttle that thread from the main thread that is running step 3. The Semaphore
class is designed for limiting one thread from another, so we call acquire()
before we produce each line, and release()
when we consume each line. If we start the semaphore at some arbitrary value like 100, then it will produce a buffer of 100 lines before blocking and waiting for the consumer to catch up.
import logging
import os
import multiprocessing
from threading import Semaphore
from time import sleep
logging.basicConfig(level=logging.INFO,
format='%(asctime)s:%(process)d:%(thread)d:%(message)s')
logger = logging.getLogger()
def process_step1(semaphore):
data = 'a' * 100000
for i in xrange(10000):
semaphore.acquire()
sleep(.001) # Faster than step 3.
yield data
if i % 1000 == 0:
logger.info('Producing %d.', i)
logger.info('Finished producing.')
def process_step2(data):
return data.upper()
def process_step3(up_data, semaphore):
assert up_data == 'A' * 100000
sleep(.005) # Slower than step 1.
semaphore.release()
def main():
pool = multiprocessing.Pool(processes=10)
semaphore = Semaphore(100)
logger.info('Starting.')
loader = process_step1(semaphore)
processed = pool.imap_unordered(process_step2, loader)
for i, up_data in enumerate(processed):
process_step3(up_data, semaphore)
if i % 500 == 0:
logger.info('Consuming %d, using %0.1f MB.', i, get_memory())
logger.info('Done.')
def get_memory():
""" Look up the memory usage, return in MB. """
proc_file = '/proc/{}/status'.format(os.getpid())
scales = {'KB': 1024.0, 'MB': 1024.0 * 1024.0}
with open(proc_file, 'rU') as f:
for line in f:
if 'VmSize:' in line:
fields = line.split()
size = int(fields[1])
scale = fields[2].upper()
return size*scales[scale]/scales['MB']
return 0.0 # Unknown
main()
Now the memory usage is steady, because the producer doesn't get very far ahead of the consumer.
2016-12-01 15:52:13,833:6695:140124578850560:Starting.
2016-12-01 15:52:13,835:6695:140124535109376:Producing 0.
2016-12-01 15:52:13,841:6695:140124578850560:Consuming 0, using 255.0 MB.
2016-12-01 15:52:16,424:6695:140124578850560:Consuming 500, using 255.0 MB.
2016-12-01 15:52:18,498:6695:140124535109376:Producing 1000.
2016-12-01 15:52:19,015:6695:140124578850560:Consuming 1000, using 255.0 MB.
2016-12-01 15:52:21,602:6695:140124578850560:Consuming 1500, using 255.0 MB.
2016-12-01 15:52:23,675:6695:140124535109376:Producing 2000.
2016-12-01 15:52:24,192:6695:140124578850560:Consuming 2000, using 255.0 MB.
2016-12-01 15:52:26,776:6695:140124578850560:Consuming 2500, using 255.0 MB.
2016-12-01 15:52:28,846:6695:140124535109376:Producing 3000.
2016-12-01 15:52:29,362:6695:140124578850560:Consuming 3000, using 255.0 MB.
2016-12-01 15:52:31,951:6695:140124578850560:Consuming 3500, using 255.0 MB.
2016-12-01 15:52:34,022:6695:140124535109376:Producing 4000.
2016-12-01 15:52:34,538:6695:140124578850560:Consuming 4000, using 255.0 MB.
2016-12-01 15:52:37,128:6695:140124578850560:Consuming 4500, using 255.0 MB.
2016-12-01 15:52:39,193:6695:140124535109376:Producing 5000.
2016-12-01 15:52:39,704:6695:140124578850560:Consuming 5000, using 255.0 MB.
2016-12-01 15:52:42,291:6695:140124578850560:Consuming 5500, using 255.0 MB.
2016-12-01 15:52:44,361:6695:140124535109376:Producing 6000.
2016-12-01 15:52:44,878:6695:140124578850560:Consuming 6000, using 255.0 MB.
2016-12-01 15:52:47,465:6695:140124578850560:Consuming 6500, using 255.0 MB.
If you're using multiprocessing.Pool
, consider upgrading to concurrent.futures.process.ProcessPoolExecutor
, because it handles killed workers better. It doesn't affect the problem described in this question.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With