I am currently playing around with multiprocessing and queues. I have written a piece of code to export data from mongoDB, map it into a relational (flat) structure, convert all values to string and insert them into mysql.
Each of these steps is submitted as a process and given import/export queues, safe for the mongoDB export which is handled in the parent.
As you will see below, I use queues and child processes terminate themselves when they read "None" from the queue. The problem I currently have is that, if a child process runs into an unhandled Exception, this is not recognized by the parent and the rest just Keeps running. What I want to happen is that the whole shebang quits and at best reraise the child error.
I have two questions:
I am using python 2.7.
Here are the essential parts of my code:
# Establish communication queues mongo_input_result_q = multiprocessing.Queue() mapper_result_q = multiprocessing.Queue() converter_result_q = multiprocessing.Queue()
[...]
# create child processes # all processes generated here are subclasses of "multiprocessing.Process" # create mapper mappers = [mongo_relational_mapper.MongoRelationalMapper(mongo_input_result_q, mapper_result_q, columns, 1000) for i in range(10)] # create datatype converter, converts everything to str converters = [datatype_converter.DatatypeConverter(mapper_result_q, converter_result_q, 'str', 1000) for i in range(10)] # create mysql writer # I create a list of writers. currently only one, # but I have the option to parallellize it further writers = [mysql_inserter.MySqlWriter(mysql_host, mysql_user, mysql_passwd, mysql_schema, converter_result_q , columns, 'w_'+mysql_table, 1000) for i in range(1)] # starting mapper for mapper in mappers: mapper.start() time.sleep(1) # starting converter for converter in converters: converter.start() # starting writer for writer in writers: writer.start()
[... initializing mongo db connection ...]
# put each dataset read to queue for the mapper for row in mongo_collection.find({inc_column: {"$gte": start}}): mongo_input_result_q.put(row) count += 1 if count % log_counter == 0: print 'Mongo Reader' + " " + str(count) print "MongoReader done" # Processes are terminated when they read "None" object from queue # now that reading is finished, put None for each mapper in the queue so they terminate themselves # the same for all followup processes for mapper in mappers: mongo_input_result_q.put(None) for mapper in mappers: mapper.join() for converter in converters: mapper_result_q.put(None) for converter in converters: converter.join() for writer in writers: converter_result_q.put(None) for writer in writers: writer.join()
Why not to let the Process to take care of its own exceptions, like this:
from __future__ import print_function import multiprocessing as mp import traceback class Process(mp.Process): def __init__(self, *args, **kwargs): mp.Process.__init__(self, *args, **kwargs) self._pconn, self._cconn = mp.Pipe() self._exception = None def run(self): try: mp.Process.run(self) self._cconn.send(None) except Exception as e: tb = traceback.format_exc() self._cconn.send((e, tb)) # raise e # You can still rise this exception if you need to @property def exception(self): if self._pconn.poll(): self._exception = self._pconn.recv() return self._exception
Now you have, both error and traceback at your hands:
def target(): raise ValueError('Something went wrong...') p = Process(target = target) p.start() p.join() if p.exception: error, traceback = p.exception print(traceback)
Regards, Marek
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With