Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create multiple workers in Python-RQ?

We have recently forced to replace celery with RQ as it is simpler and celery was giving us too many problems. Now, we are not able to find a way to create multiple queues dynamically because we need to get multiple jobs done concurrently. So basically every request to one of our routes should start a job and it doesn't make sense to have multiple users wait for one user's job to be done before we can proceed with next jobs. We periodically send a request to the server in order to get the status of the job and some meta data. This way we can update the user with a progress bar (It could be a lengthy process so this has to be done for the sake of UX)

We are using Django and Python's rq library. We are not using django-rq (Please let me know if there are advantages in using this)

So far we start a task in one of our controllers like:

redis_conn = Redis()
q = Queue(connection=redis_conn)  
job = django_rq.enqueue(render_task, new_render.pk, domain=domain, data=csv_data, timeout=1200)

Then in our render_task method we add meta data to the job based on the state of the long task:

current_job = get_current_job()
current_job.meta['state'] = 'PROGRESS'
current_job.meta['process_percent'] = process_percent
current_job.meta['message'] = 'YOUTUBE'
current_job.save()

Now we have another endpoint that gets the current task and its meta data and passes it back to client (This happens through oeriodic AJAX request)

How do we go about running jobs concurrently without blocking other jobs? Should we make queues dynamically? Is there a way to make use of Workers in order to achieve this?

like image 820
RazorHead Avatar asked Sep 16 '15 02:09

RazorHead


3 Answers

As far as I know RQ does not have any facility to manage multiple workers. You have to start a new worker process defining which queue it will consume. One way of doing this which works pretty well for me is using Supervisor. In supervisor you configure your worker for a given queue and number of processes to have concurrency. For example you can have queue "high-priority" with 5 workers and queue "low-priority" with 1 worker.

like image 178
marcin_koss Avatar answered Oct 09 '22 10:10

marcin_koss


It is not only possible but ideal to run multiple workers. I use a bash file for the start command to enter the virtual env, and launch with a custom Worker class.

Here's a supervisor config that has worked very well for me for RQ workers, under a production workload as well. Note that startretries is high since this runs on AWS and needs retries during deployments.

[program:rq-workers]
process_name=%(program_name)s_%(process_num)02d
command=/usr/local/bin/start_rq_worker.sh
autostart=true
autorestart=true
user=root
numprocs=5
startretries=50
stopsignal=INT
killasgroup=true
stopasgroup=true
stdout_logfile=/opt/elasticbeanstalk/tasks/taillogs.d/super_logs.conf
redirect_stderr=true

Contents of start_rq_worker.sh

#!/bin/bash
date > /tmp/date
source /opt/python/run/venv/bin/activate
source /opt/python/current/env
/opt/python/run/venv/bin/python /opt/python/current/app/manage.py
rqworker --worker-class rq.SimpleWorker default
like image 28
Dougyfresh Avatar answered Oct 09 '22 09:10

Dougyfresh


I would like to suggest a very simple solution using django-rq:

Sample settings.py

...

RQ_QUEUES = {
    'default': {
        'HOST': os.getenv('REDIS_HOST', 'localhost'),
        'PORT': 6379,
        'DB': 0,
        'DEFAULT_TIMEOUT': 360,
    },
    'low': {
        'HOST': os.getenv('REDIS_HOST', 'localhost'),
        'PORT': 6379,
        'DB': 0,
        'DEFAULT_TIMEOUT': 360,
    }
}

...

Run Configuration

Run python manage.py rqworker default low as many times (each time in its own shell, or as its own Docker container, for instance) as the number of desired workers. The order of queues in the command determines their priority. At this point, all workers are listening to both queues.

In the Code

When calling a job to run, pass in the desired queue:

For high/normal priority jobs, you can make the call without any parameters, and the job will enter the default queue. For low priority, you must specify, either at the job level:

@job('low')
def my_low_priority_job():
  # some code

And then call my_low_priority_job.delay().

Alternatively, determine priority when calling:

queue = django_rq.get_queue('low')
queue.enqueue(my_variable_priority_job)
like image 3
ygesher Avatar answered Oct 09 '22 09:10

ygesher