I am using pool.map to populate a dictionary-- called nodes. To be clear: this dictionary is populated after pool.map runs so sharing the variable between processes is not a a concern. Everything the function returns and everything in the dictionary IS picklable. It is populating a dictionary that is essentially a graph. When I go 1, 2, 3 deep into populating this graph the program runs flawlessly. However at 4 deep: the program doesn't seem to crash but just freezes. I set up print statements in the function I am mapping to and at the very end of its run it prints the statement at the very top of the program and then freezes. Here is how I am calling pool.map:
currentNode = startingNode
nodesPopulated = [currentNode]
connections = []
merger = []
pool = Pool(cpu_count())
for currentDepth in range(1, depth):
print('=' * 70)
print("= At depth", currentDepth)
connections = []
for item in nodesPopulated:
if item != None:
if item.isPopulated():
connections +=list(item.getConnections().values())
print("= Current number of connections:",len(connections))
print("= Current number of NodesPopulated in this iteration: ",len(nodesPopulated))
print("= Total number of nodes",len(self.nodes.keys()))
nodesPopulated = pool.map(self.populateTopicNode, connections)
print('\n= Successfully populated another round of nodes')
for node in nodesPopulated:
if node != None:
if item.isPopulated():
self.nodes[node.getTopic().getName()] = node
#self.populatedNodes[node.getTopic().getName()] = True;
print('= Updated self.nodes\n')
pool.close()
pool.join()
print('\nCount = ',len(list(self.nodes.keys())))
return
Once again, I make sure everything returned into nodesPopulated is picklable. I am at my wits end because to run this program 4 deep takes about 2 hours and without pool.map it works flawlessly but takes about 6 hours. I don't want to ditch multiprocessing but I can't figure this out and it takes forever to debug. The last thing it prints before freezing is 'D' which is at the top of self.populateTopicNode. I also think the object that is getting too large(both self.nodes and connections) may be why this is freezing.
Note: I am certain this is a multiprocessing issue because I ran this exact code without using pool.map and replaced it with a for loop and it ran to completion without error. So something is causing pool.map to freeze. No error message just gets hung at the very first reference to the parameter of the function. Here are the first few lines of 'populateTopicNode':
def populateTopicNode(self, node: TopicNode):
print('D')
if(node.isPopulated()):
return None
The last thing seen on the console before freezing is 'D'
EDIT: I did some tests to give you the exact numbers of when it hangs:
and it hangs using about 1300 mb of memory.
EDIT2:
Okay so I found out that it IS returning something not just hanging randomly. It returns None and then hangs. I am unsure why because there is plenty of times when it returns None and works fine. I also wrapped my function in a try except to see if returning an exception to the parent was freaking out and that isn't the problem either. No exceptions are being caught and it IS running to a point where it returns. It just hangs after returning.
EDIT3:
It breaks at the same exact spot every single iteration. I print the name of the current Topic it is processing and it always breaks at the same spot at the same line and then hangs. I am unsure if that helps but it is additional information. Consistently breaking at the same exact time.
Pool process pool provides a version of the map() function where the target function is called for each item in the provided iterable in parallel. A parallel equivalent of the map() built-in function […]. It blocks until the result is ready.
The pool. map() method. The difference is that the result of each item is received as soon as it is ready, instead of waiting for all of them to be finished. Moreover, the map() method converts the iterable into a list (if it is not). However, the imap() method does not.
multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.
From the multiprocessing guidelines.
As far as possible one should try to avoid shifting large amounts of data between processes.
multiprocessing.Pool
relies on a locked buffer (an OS Pipe) to distribute the tasks between the workers and retrieve their results. If an object larger than the buffer is pushed trough the pipe, there are chances the logic might hang.
I'd suggest you to dump the jobs to files (using pickle
for example) and send the filenames to the child processes. In this way each process can retrieve the data independently. Not only you prevent your logic from getting stuck but you will notice speed improvements as well as the pipe becomes a severe bottleneck in your design.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With