Basically I have a lot of tasks (in batches of about 1000) and execution times of these tasks can vary widely (from less than second to 10 minutes). I know that if a task is executing more than a minute i can kill it. These tasks are steps in optimization of some data mining model (but are independent of each other) and are spending most of the time inside some C extension function so they would not cooperate if I tried to kill them gracefully.
Is there a distributed task queue that fits into that schema --- AFAIK: celery allows aborting tasks that are willing to cooperate. But I might be wrong.
I recently asked similar question about killing hanging functions in pure python Kill hanging function in Python in multithreaded enviorment.
I guess I could subclass celery task so it spawns a new process and then executes its payload aborting it's execution if it takes to long, but then I would be killed by overhead of initialization of new interpreter.
A distributed task queue allows you offload work to another process, to be handled asynchronously (once you push the work onto the queue, you don't wait) and in parallel (you can use other cores to process the work).
Queue is built-in module of Python which is used to implement a queue. queue. Queue(maxsize) initializes a variable to a maximum size of maxsize.
This page describes what task queues are, and when and how to use them. Task queues let applications perform work, called tasks, asynchronously outside of a user request. If an app needs to execute work in the background, it adds tasks to task queues. The tasks are executed later, by worker services.
Queue's don't inherently have the idea of being complete or done. They can be used indefinitely. To close it up when you are done, you will indeed need to put None or some other magic value at the end and write the logic to check for it, as you described. The ideal way would probably be subclassing the Queue object.
Celery supports time limiting. You can use time limits to kill long running tasks. Beside killing tasks you can use soft limits which enable to handle SoftTimeLimitExceeded exceptions in tasks and terminate tasks cleanly.
from celery.task import task
from celery.exceptions import SoftTimeLimitExceeded
@task
def mytask():
try:
do_work()
except SoftTimeLimitExceeded:
clean_up_in_a_hurry()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With