I swear I saw the following in an example somewhere, but now I can't find that example and this isn't working. The __call__ class function never gets called.
EDIT: Code updated
pool.map appears to start the QueueWriter instance and the __call__ function is reached. However, the workers never seem to start or at least no results are pulled from the queue. Is my queue set up the right way? Why do the workers not fire off?
import multiprocessing as mp
import os
import random
class QueueWriter(object):
def __init__(self, **kwargs):
self.grid = kwargs.get("grid")
self.path = kwargs.get("path")
def __call__(self, q):
print self.path
log = open(self.path, "a", 1)
log.write("QueueWriter called.\n")
while 1:
res = q.get()
if res == 'kill':
self.log.write("QueueWriter received 'kill' message. Closing Writer.\n")
break
else:
self.log.write("This is where I'd write: {0} to grid file.\n".format(res))
log.close()
log = None
class Worker(object):
def __init__(self, **kwargs):
self.queue = kwargs.get("queue")
self.grid = kwargs.get("grid")
def __call__(self, idx):
res = self.workhorse(self, idx)
self.queue.put((idx,res))
return res
def workhorse(self,idx):
#in reality a fairly complex operation
return self.grid[idx] ** self.grid[idx]
if __name__ == '__main__':
# log = open(os.path.expanduser('~/minimal.log'), 'w',1)
path = os.path.expanduser('~/minimal.log')
pool = mp.Pool(mp.cpu_count())
manager = mp.Manager()
q = manager.Queue()
grid = [random.random() for _ in xrange(10000)]
# in actuality grid is a shared resource, read by Workers and written
# to by QueueWriter
qWriter = QueueWriter(grid=grid, path=path)
watcher = pool.map(qWriter, (q,),1)
wrkr = Worker(queue=q,grid=grid)
result = pool.map(wrkr, range(10000), 1)
result.get()
q.put('kill')
pool.close()
pool.join()
So the log does indeed print the initialization message, but then __call__ function is never called. Is this one of those pickling issues I've seen discussed so often? I've found answers about class member functions, but what about class instances?
At the gentle and patient prodding of martineau (thanks!) I think I've ironed out the problems. I have yet to apply it to my original code, but it is working in the example above and I'll start new questions for future implementation problems.
So in addition to changing where in the code the target file (the log, in this example) gets opened, I also started the QueueWriter instance as a single multiprocessing process rather than using pool.map. As martineau pointed out the map call blocks until the qWriter.__call__() returns and this prevented the workers from being called.
There were some other bugs in the code above, but those were incidental and fixed below:
import multiprocessing as mp
import os
import random
class QueueWriter(object):
def __init__(self, **kwargs):
self.grid = kwargs.get("grid")
self.path = kwargs.get("path")
def __call__(self, q):
print self.path
log = open(self.path, "a", 1)
log.write("QueueWriter called.\n")
while 1:
res = q.get()
if res == 'kill':
log.write("QueueWriter received 'kill' message. Closing Writer.\n")
break
else:
log.write("This is where I'd write: {0} to grid file.\n".format(res))
log.close()
log = None
class Worker(object):
def __init__(self, **kwargs):
self.queue = kwargs.get("queue")
self.grid = kwargs.get("grid")
def __call__(self, idx):
res = self.workhorse(idx)
self.queue.put((idx,res))
return res
def workhorse(self,idx):
#in reality a fairly complex operation
return self.grid[idx] ** self.grid[idx]
if __name__ == '__main__':
# log = open(os.path.expanduser('~/minimal.log'), 'w',1)
path = os.path.expanduser('~/minimal.log')
pool = mp.Pool(mp.cpu_count())
manager = mp.Manager()
q = manager.Queue()
grid = [random.random() for _ in xrange(10000)]
# in actuality grid is a shared resource, read by Workers and written
# to by QueueWriter
qWriter = QueueWriter(grid=grid, path=path)
# watcher = pool.map(qWriter, (q,),1)
# Start the writer as a single process rather than a pool
p = mp.Process(target=qWriter, args=(q,))
p.start()
wrkr = Worker(queue=q,grid=grid)
result = pool.map(wrkr, range(10000), 1)
# result.get()
# not required for pool
q.put('kill')
pool.close()
p.join()
pool.join()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With