In Python 2.7 i have implemented a multiprocessing scenario with multiple queues and consumers. The simplified idea is, that I have a producer of jobs, which are fed to a consumer, handling the jobs and an error handler, that does all the logging. Very simplified, it all looks comparable to that:
import multiprocessing as mp
import Queue
job_queue = mp.Queue()
error_queue = mp.Queue()
for i in range(10):
job_queue.put(i)
def job_handler(job_queue, error_queue):
print 'Job handler'
while True:
try:
element = job_queue.get_nowait()
print element
except:
# t1
error_queue.put('Error')
error_queue.close()
error_queue.join_thread()
job_queue.close()
job_queue.join_thread()
# t2
return 1
def error_handler(error_queue):
result = error_queue.get()
if result == 'Error':
error_queue.close()
error_queue.join_thread()
if __name__ == '__main__':
print 'Starting'
p1 = mp.Process(target = error_handler, args = (error_queue, ))
p1.start()
p2 = mp.Process(target = job_handler, args = (job_queue, error_queue))
p2.start()
This basically works, but in my more complex programm, there is a very long time difference between the two commentary points t1
and t2
(about 5 min). So I have two questions:
close()
and join_thread()
on all used Queue objects, to indicate it's done using them? I think, that subprocesses do that implicitly when I end them, for example by returning as stated here:join_thread() Join the background thread. This can only be used after close() has been called. It blocks until the background thread exits, ensuring that all data in the buffer has been flushed to the pipe.
By default if a process is not the creator of the queue then on exit it will attempt to join the queue’s background thread. The process can call cancel_join_thread() to make join_thread() do nothing.
Found the following in the docs: docs.python.org
From the docs:
Joining processes that use queuesBear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the Queue.cancel_join_thread method of the queue to avoid this behaviour.)
This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.
As i unterstand, a process, here p2=jobHandler, should not exit immediately after put items in a queue to avoid loose of queued data. Can't found any Explanation for the sentence Otherwise you cannot ... have put items on the queue will terminate.
Beside the above, I want to comment your code. I recognized that this code is simplified.
Avoid placing code that is executed on startup outside of if __name__ == '__main__':
From the docs: Safe importing of main module One should protect the “entry point” of the program by using if name == 'main':
job_queue = mp.Queue()
error_queue = mp.Queue()
for i in range(10):
job_queue.put(i)
.close()
in def job_handler/error_handler
except:
...
job_queue.close()
This is wrong, as job_handler process will never put messages on this queue.
This apply also for process error_handler and error_queue.close()
From the docs:
Indicate that no more data will be put on this queue by the current process.
The background thread will quit once it has flushed all buffered data to the pipe.
This is called automatically when the queue is garbage collected.
.join_thread()
in def job_handler/error_handler
This is useless, as the job_handler process dosen't put messages on this queue. Therefore .join_thread
does nothing.
This is also true for process error_handler.
except:
...
job_queue.join_thread()# t2
def error_handler(error_queue):
...
error_queue.close()
error_queue.join_thread()
Use Exit(1)
instead of return 1
The Errorcode '1' can not be catched with p2.exitcode.
Think more a process as a own programm instead of a function.
return 1
Try the following:
# t1
error_queue.put('Error')
error_queue.close()
# Give the error_handler a chance to get a timeslice
time.sleep(0.2)
error_queue.join_thread()
#job_queue.close()
#job_queue.join_thread()
# t2
exit(1)
Tested with Python:3.4.2 and Python:2.7.9
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With