I'm building a Python script/application which launches multiple so-called Fetchers. They in turn do something and return data into a queue.
I want to make sure the Fetchers don't run for more than 60 seconds (because the entire application runs multiple times in one hour).
Reading the Python docs i noticed they say to be carefull when using Process.Terminate() because it can break the Queue.
My current code:
# Result Queue
resultQueue = Queue();
# Create Fetcher Instance
fetcher = fetcherClass()
# Create Fetcher Process List
fetcherProcesses = []
# Run Fetchers
for config in configList:
# Create Process to encapsulate Fetcher
log.debug("Creating Fetcher for Target: %s" % config['object_name'])
fetcherProcess = Process(target=fetcher.Run, args=(config,resultQueue))
log.debug("Starting Fetcher for Target: %s" % config['object_name'])
fetcherProcess.start()
fetcherProcesses.append((config, fetcherProcess))
# Wait for all Workers to complete
for config, fetcherProcess in fetcherProcesses:
log.debug("Waiting for Thread to complete (%s)." % str(config['object_name']))
fetcherProcess.join(DEFAULT_FETCHER_TIMEOUT)
if fetcherProcess.is_alive():
log.critical("Fetcher thread for object %s Timed Out! Terminating..." % config['object_name'])
fetcherProcess.terminate()
# Loop thru results, and save them in RRD
while not resultQueue.empty():
config, fetcherResult = resultQueue.get()
result = storage.Save(config, fetcherResult)
I want to make sure my Queue doesn't get corrupted when one of my Fetchers times out.
What is the best way to do this?
Edit: In response to a chat with sebdelsol a few clarifications:
1) I want to start processing data as soon as possible, because otherwise i have to perform a lot of Disk Intensive operations all at once. So sleeping the main thread for X_Timeout is not an option.
2) I need to wait for the Timeout only once, but per process, so if the main thread launches 50 fetchers, and this takes a few seconds to half a minute, i need to compensate.
3) I want to make sure the data that comes from Queue.Get() is put there by a Fetcher that didn't timeout (since it is theoretically possible that a fetcher was putting the data in the Queue, when the timeout occured, and it was shot to death...) That data should be dumped.
It's not a very bad thing when a timeout occurs, it's not a desirable situation, but corrupt data is worse.
You could pass a new multiprocessing.Lock()
to every fetcher you start.
In the fetcher's process, be sure to wrap the Queue.put()
with this lock:
with self.lock:
self.queue.put(result)
When you need to terminate a fetcher's process, use its lock :
with fetcherLock:
fetcherProcess.terminate()
This way, your queue won't get corrupted by killing a fetcher during the queue access.
Some fetcher's locks could get corrupted. But, that's not an issue since every new fetcher you launch has a brand new lock.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With