Here's the program:
#!/usr/bin/python import multiprocessing def dummy_func(r): pass def worker(): pass if __name__ == '__main__': pool = multiprocessing.Pool(processes=16) for index in range(0,100000): pool.apply_async(worker, callback=dummy_func) # clean up pool.close() pool.join()
I found memory usage (both VIRT and RES) kept growing up till close()/join(), is there any solution to get rid of this? I tried maxtasksperchild with 2.7 but it didn't help either.
I have a more complicated program that calles apply_async() ~6M times, and at ~1.5M point I've already got 6G+ RES, to avoid all other factors, I simplified the program to above version.
EDIT:
Turned out this version works better, thanks for everyone's input:
#!/usr/bin/python import multiprocessing ready_list = [] def dummy_func(index): global ready_list ready_list.append(index) def worker(index): return index if __name__ == '__main__': pool = multiprocessing.Pool(processes=16) result = {} for index in range(0,1000000): result[index] = (pool.apply_async(worker, (index,), callback=dummy_func)) for ready in ready_list: result[ready].wait() del result[ready] ready_list = [] # clean up pool.close() pool.join()
I didn't put any lock there as I believe main process is single threaded (callback is more or less like a event-driven thing per docs I read).
I changed v1's index range to 1,000,000, same as v2 and did some tests - it's weird to me v2 is even ~10% faster than v1 (33s vs 37s), maybe v1 was doing too many internal list maintenance jobs. v2 is definitely a winner on memory usage, it never went over 300M (VIRT) and 50M (RES), while v1 used to be 370M/120M, the best was 330M/85M. All numbers were just 3~4 times testing, reference only.
Python does not free memory back to the system immediately after it destroys some object instance. It has some object pools, called arenas, and it takes a while until those are released. In some cases, you may be suffering from memory fragmentation which also causes process' memory usage to grow. sys.
shared_memory — Shared memory for direct access across processes. New in version 3.8. This module provides a class, SharedMemory , for the allocation and management of shared memory to be accessed by one or more processes on a multicore or symmetric multiprocessor (SMP) machine.
Using Pool. The Pool class in multiprocessing can handle an enormous number of processes. It allows you to run multiple jobs per process (due to its ability to queue the jobs). The memory is allocated only to the executing processes, unlike the Process class, which allocates memory to all the processes.
As we have seen, the Pool allocates only executing processes in memory and the process allocates all the tasks in memory, so when the task number is small, we can use process class and when the task number is large, we can use the pool.
I had memory issues recently, since I was using multiple times the multiprocessing function, so it keep spawning processes, and leaving them in memory.
Here's the solution I'm using now:
def myParallelProcess(ahugearray): from multiprocessing import Pool from contextlib import closing with closing(Pool(15)) as p: res = p.imap_unordered(simple_matching, ahugearray, 100) return res
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With