Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python / rq - monitoring worker status

If this is an idiotic question, I apologize and will go hide my head in shame, but:

I'm using rq to queue jobs in Python. I want it to work like this:

  1. Job A starts. Job A grabs data via web API and stores it.
  2. Job A runs.
  3. Job A completes.
  4. Upon completion of A, job B starts. Job B checks each record stored by job A and adds some additional response data.
  5. Upon completion of job B, user gets a happy e-mail saying their report's ready.

My code so far:

redis_conn = Redis()
use_connection(redis_conn)
q = Queue('normal', connection=redis_conn) # this is terrible, I know - fixing later
w = Worker(q)
job = q.enqueue(getlinksmod.lsGet, theURL,total,domainid)
w.work()

I assumed my best solution was to have 2 workers, one for job A and one for B. The job B worker could monitor job A and, when job A was done, get started on job B.

What I can't figure out to save my life is how I get one worker to monitor the status of another. I can grab the job ID from job A with job.id. I can grab the worker name with w.name. But haven't the foggiest as to how I pass any of that information to the other worker.

Or, is there a much simpler way to do this that I'm totally missing?

like image 707
user1066609 Avatar asked Aug 23 '12 21:08

user1066609


People also ask

What is RQ worker?

RQ: Workers. A worker is a Python process that typically runs in the background and exists solely as a work horse to perform lengthy or blocking tasks that you don't want to perform inside web processes.

How does Django RQ work?

Django-RQ allows you to easily put jobs into any of the queues defined in settings.py . enqueue() returns a job object that provides a variety of information about the job's status, parameters, etc. equeue() takes the function to be enqueued as the first parameter, then a list of arguments.

What is RQ dashboard?

rq-dashboard is a general purpose, lightweight, Flask-based web front-end to monitor your RQ queues, jobs, and workers in realtime.


2 Answers

Update januari 2015, this pull request is now merged, and the parameter is renamed to depends_on, ie:

second_job = q.enqueue(email_customer, depends_on=first_job)

The original post left intact for people running older versions and such:

I have submitted a pull request (https://github.com/nvie/rq/pull/207) to handle job dependencies in RQ. When this pull request gets merged in, you'll be able to do:

def generate_report():
    pass

def email_customer():
    pass

first_job = q.enqueue(generate_report)
second_job = q.enqueue(email_customer, after=first_job)
# In the second enqueue call, job is created,
# but only moved into queue after first_job finishes

For now, I suggest writing a wrapper function to sequentially run your jobs. For example:

def generate_report():
     pass

def email_customer():
    pass

def generate_report_and_email():
    generate_report()
    email_customer() # You can also enqueue this function, if you really want to

# Somewhere else
q.enqueue(generate_report_and_email)
like image 83
Selwin Ong Avatar answered Sep 28 '22 08:09

Selwin Ong


From this page on the rq docs, it looks like each job object has a result attribute, callable by job.result, which you can check. If the job hasn't finished, it'll be None, but if you ensure that your job returns some value (even just "Done"), then you can have your other worker check the result of the first job and then begin working only when job.result has a value, meaning the first worker was completed.

like image 25
jdotjdot Avatar answered Sep 28 '22 10:09

jdotjdot