Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Python Multiprocessing: Handling Child Errors in Parent

I am currently playing around with multiprocessing and queues. I have written a piece of code to export data from mongoDB, map it into a relational (flat) structure, convert all values to string and insert them into mysql.

Each of these steps is submitted as a process and given import/export queues, safe for the mongoDB export which is handled in the parent.

As you will see below, I use queues and child processes terminate themselves when they read "None" from the queue. The problem I currently have is that, if a child process runs into an unhandled Exception, this is not recognized by the parent and the rest just Keeps running. What I want to happen is that the whole shebang quits and at best reraise the child error.

I have two questions:

  1. How do I detect the child error in the parent?
  2. How do I kill my child processes after detecting the error (best practice)? I realize that putting "None" to the queue to kill the child is pretty dirty.

I am using python 2.7.

Here are the essential parts of my code:

# Establish communication queues mongo_input_result_q = multiprocessing.Queue() mapper_result_q = multiprocessing.Queue() converter_result_q = multiprocessing.Queue() 


    # create child processes     # all processes generated here are subclasses of "multiprocessing.Process"      # create mapper     mappers = [mongo_relational_mapper.MongoRelationalMapper(mongo_input_result_q, mapper_result_q, columns, 1000)                for i in range(10)]      # create datatype converter, converts everything to str     converters = [datatype_converter.DatatypeConverter(mapper_result_q, converter_result_q, 'str', 1000)                   for i in range(10)]      # create mysql writer     # I create a list of writers. currently only one,      # but I have the option to parallellize it further     writers = [mysql_inserter.MySqlWriter(mysql_host, mysql_user, mysql_passwd, mysql_schema, converter_result_q                , columns, 'w_'+mysql_table, 1000) for i in range(1)]      # starting mapper     for mapper in mappers:         mapper.start()     time.sleep(1)      # starting converter     for converter in converters:         converter.start()      # starting writer     for writer in writers:         writer.start() 

[... initializing mongo db connection ...]

    # put each dataset read to queue for the mapper     for row in mongo_collection.find({inc_column: {"$gte": start}}):         mongo_input_result_q.put(row)         count += 1         if count % log_counter == 0:             print 'Mongo Reader' + " " + str(count)     print "MongoReader done"      # Processes are terminated when they read "None" object from queue     # now that reading is finished, put None for each mapper in the queue so they terminate themselves     # the same for all followup processes     for mapper in mappers:         mongo_input_result_q.put(None)     for mapper in mappers:         mapper.join()     for converter in converters:         mapper_result_q.put(None)     for converter in converters:         converter.join()     for writer in writers:         converter_result_q.put(None)     for writer in writers:         writer.join() 
like image 492
drunken_monkey Avatar asked Nov 12 '13 08:11


1 Answers

Why not to let the Process to take care of its own exceptions, like this:

from __future__ import print_function import multiprocessing as mp import traceback  class Process(mp.Process):     def __init__(self, *args, **kwargs):         mp.Process.__init__(self, *args, **kwargs)         self._pconn, self._cconn = mp.Pipe()         self._exception = None      def run(self):         try:             mp.Process.run(self)             self._cconn.send(None)         except Exception as e:             tb = traceback.format_exc()             self._cconn.send((e, tb))             # raise e  # You can still rise this exception if you need to      @property     def exception(self):         if self._pconn.poll():             self._exception = self._pconn.recv()         return self._exception 

Now you have, both error and traceback at your hands:

def target():     raise ValueError('Something went wrong...')  p = Process(target = target) p.start() p.join()  if p.exception:     error, traceback = p.exception     print(traceback) 

Regards, Marek

like image 76
mrkwjc Avatar answered Oct 13 '22 06:10
