Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to work around the Queue corruption when using Process.Terminate()

I'm building a Python script/application which launches multiple so-called Fetchers. They in turn do something and return data into a queue.

I want to make sure the Fetchers don't run for more than 60 seconds (because the entire application runs multiple times in one hour).

Reading the Python docs i noticed they say to be carefull when using Process.Terminate() because it can break the Queue.

My current code:

# Result Queue
resultQueue = Queue();

# Create Fetcher Instance
fetcher = fetcherClass()

# Create Fetcher Process List
fetcherProcesses = []

# Run Fetchers
for config in configList:
    # Create Process to encapsulate Fetcher
    log.debug("Creating Fetcher for Target: %s" % config['object_name'])
    fetcherProcess = Process(target=fetcher.Run, args=(config,resultQueue))

    log.debug("Starting Fetcher for Target: %s" % config['object_name'])
    fetcherProcess.start()
    fetcherProcesses.append((config, fetcherProcess))

# Wait for all Workers to complete
for config, fetcherProcess in fetcherProcesses:
    log.debug("Waiting for Thread to complete (%s)." % str(config['object_name']))
    fetcherProcess.join(DEFAULT_FETCHER_TIMEOUT)
    if fetcherProcess.is_alive():
        log.critical("Fetcher thread for object %s Timed Out! Terminating..." % config['object_name'])
        fetcherProcess.terminate()

# Loop thru results, and save them in RRD
while not resultQueue.empty():
    config, fetcherResult = resultQueue.get()
    result = storage.Save(config, fetcherResult)

I want to make sure my Queue doesn't get corrupted when one of my Fetchers times out.

What is the best way to do this?

Edit: In response to a chat with sebdelsol a few clarifications:

1) I want to start processing data as soon as possible, because otherwise i have to perform a lot of Disk Intensive operations all at once. So sleeping the main thread for X_Timeout is not an option.

2) I need to wait for the Timeout only once, but per process, so if the main thread launches 50 fetchers, and this takes a few seconds to half a minute, i need to compensate.

3) I want to make sure the data that comes from Queue.Get() is put there by a Fetcher that didn't timeout (since it is theoretically possible that a fetcher was putting the data in the Queue, when the timeout occured, and it was shot to death...) That data should be dumped.

It's not a very bad thing when a timeout occurs, it's not a desirable situation, but corrupt data is worse.

like image 336
Thijs Cramer Avatar asked Sep 12 '14 11:09

Thijs Cramer


1 Answers

You could pass a new multiprocessing.Lock() to every fetcher you start.

In the fetcher's process, be sure to wrap the Queue.put() with this lock:

with self.lock:
    self.queue.put(result)

When you need to terminate a fetcher's process, use its lock :

with fetcherLock:
    fetcherProcess.terminate()

This way, your queue won't get corrupted by killing a fetcher during the queue access.

Some fetcher's locks could get corrupted. But, that's not an issue since every new fetcher you launch has a brand new lock.

like image 71
sebdelsol Avatar answered Oct 19 '22 23:10

sebdelsol