I've started using RQ / Redis to build out some asynchronous execution of some long running jobs for my django site. I'm hoping to do something like the following:
I want one queue for each instance of a model. You can think of this model like an api user account.(there will not be a lot of these. 15 - 20 at most)
I will be distributing batches of tasks (anywhere from 10 - 500) evenly across the queues. Multiple batches may be added before the first batch completes.
With each batch, I would like to start up a worker for each queue that is not actively being worked on, and I would like to run these workers in batch mode, so that once they run out of tasks they will shut down.
I realize I could just not run them in batch mode, and then I would always be working on / listening for work on all of the queues. The problem with this is that I would like to be able to add and remove queues dynamically, so it's better if it starts up the available queues with each batch.
I realize it might seem odd that I'm distributing tasks across the queues, but the reason for this is that each task in the same queue must be rate limited / throttled according to a service I'm using (think of it as an API rate limit, but where each queue represents a different account). But for my purposes it makes no difference which account the task is running on, so I might as well parallelize across all the accounts.
The problem that I am facing is that if I start a worker and give it a queue that is already being worked on, I now have two workers operating independently on that queue and so my expected throttling rate gets cut in half. How can I only start a worker if there is not already a worker operating on that queue? I could probably find a hacky solution to this, but I would prefer to handle it the "right" way and since I don't have much experience with queues I thought I should ask.
I am already implementing my own worker class so that I can dynamically control the queues, so I just need a way to add logic where if that queue is already being worked on, it will not be given a new worker. A simple version of my worker is here:
# custom_worker.py
import sys
from Api.models import *
from rq import Queue, Connection, Worker
# importing the necessary namespace for the tasks to run
from tasks import *
# dynamically getting the queue names in which I am expecting tasks
queues = [user.name for user in ApiUser.objects.all()]
with Connection():
qs = list(map(Queue, queues)) or [Queue()]
w = Worker(qs)
w.work(burst=True)
Finding a solution just meant diving into python-rq's source code a little bit. I might look into improving the documentation. Anyway, this seems to work for my needs!
import sys
from Api.models import *
from rq import Queue, Connection, Worker
# importing the necessary namespace for the tasks to run
from tasks import *
# Provide queue names to listen to as arguments to this script,
with Connection():
current_workers = Worker.all()
working_queues = [queue.name for worker in current_workers for queue in worker.queues]
proposed_queues = [user.name for user in ApiUser.objects.all()]
queues_to_start = [queue for queue in proposed_queues if not queue in working_queues]
if len(queues_to_start) > 0:
qs = list(map(Queue, queues_to_start))
w = Worker(qs)
w.work(burst=True)
else:
print("Nothing to do here.")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With