Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

imap_unordered() hangs up if iterable throws an error

Consider the file sample.py contains the following code:

from multiprocessing import Pool

def sample_worker(x):
    print "sample_worker processes item", x
    return x

def get_sample_sequence():
    for i in xrange(2,30):
        if i % 10 == 0:
            raise Exception('That sequence is corrupted!')
        yield i

if __name__ == "__main__":
    pool = Pool(24)
    try:
        for x in pool.imap_unordered(sample_worker, get_sample_sequence()):
            print "sample_worker returned value", x
    except:
        print "Outer exception caught!"
    pool.close()
    pool.join()
    print "done"

When I execute it, I get the following output:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "C:\Python27\lib\threading.py", line 810, in __bootstrap_inner
    self.run()
  File "C:\Python27\lib\threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "C:\Python27\lib\multiprocessing\pool.py", line 338, in _handle_tasks
    for i, task in enumerate(taskseq):
  File "C:\Python27\lib\multiprocessing\pool.py", line 278, in <genexpr>
    self._taskqueue.put((((result._job, i, func, (x,), {})
  File "C:\Users\renat-nasyrov\Desktop\sample.py", line 10, in get_sample_sequence
    raise Exception('That sequence is corrupted!')
Exception: That sequence is corrupted!

After that, application hangs up. How can I handle the situation without hangups?

like image 346
Pehat Avatar asked Dec 18 '14 15:12

Pehat


1 Answers

As septi mentioned your indentation is (still) wrong. Indent the yield statement so that i is within its scope. I am not entirely sure as to what actually happens in your Code, but yielding a variable that is out of scope does not seem like a good idea:

from multiprocessing import Pool

def sample_worker(x):
    print "sample_worker processes item", x
    return x

def get_sample_sequence():
    for i in xrange(2,30):
      if i % 10 == 0:
          raise Exception('That sequence is corrupted!')
      yield i # fixed

if __name__ == "__main__":
    pool = Pool(24)
    try:
        for x in pool.imap_unordered(sample_worker, get_sample_sequence()):
            print "sample_worker returned value", x
    except:
        print "Outer exception caught!"
    pool.close()
    pool.join()
    print "done"

To handle exceptions in your generator, you may use a wrapper such as this one:

import logging
def robust_generator():
    try:
        for i in get_sample_sequence():
            logging.debug("yield "+str(i))
            yield i
    except Exception, e:
        logging.exception(e)
        raise StopIteration()
like image 140
phobic Avatar answered Nov 09 '22 03:11

phobic