Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python RQ: pattern for callback

I have now a big number of documents to process and am using Python RQ to parallelize the task.

I would like a pipeline of work to be done as different operations is performed on each document. For example: A -> B -> C means pass the document to function A, after A is done, proceed to B and last C.

However, Python RQ does not seem to support the pipeline stuff very nicely.

Here is a simple but somewhat dirty of doing this. In one word, each function along the pipeline call its next function in a nesting way.

For example, for a pipeline A->B->C.

At the top level, some code is written like this:

q.enqueue(A, the_doc)

where q is the Queue instance and in function A there are code like:

q.enqueue(B, the_doc)

And in B, there are something like this:

q.enqueue(C, the_doc)

Is there any other way more elegant than this? For example some code in ONE function:

q.enqueue(A, the_doc) q.enqueue(B, the_doc, after = A) q.enqueue(C, the_doc, after= B)

depends_on parameter is the closest one to my requirement, however, running something like:

A_job = q.enqueue(A, the_doc) q.enqueue(B, depends_on=A_job )

won't work. As q.enqueue(B, depends_on=A_job ) is executed immediately after A_job = q.enqueue(A, the_doc) is executed. By the time B is enqueued, the result from A might not be ready as it takes time to process.

PS:

If Python RQ is not really good at this, what else tool in Python can I use to achieve the same purpose:

  1. round-robin parallelization
  2. pipeline processing support
like image 668
xiaohan2012 Avatar asked Jun 18 '14 21:06

xiaohan2012


1 Answers

By the time B is enqueued, the result from A might not be ready as it takes time to process.

I'm not sure if this was actually true when you originally posted the question but in any case, this is not true now. In fact, the depends_on feature is made exactly for the workflow you described.

It is true that these two functions are executed immediately in succession.

A_job = q.enqueue(A, the_doc)
B_job = q.enqueue(B, depends_on=A_job )

But the worker will not execute B until A is finished. Until A_job is successfully executed, B.status == 'deferred'. Once A.status == 'finished', then B will start to run.

This means that B and C can access and operate on the result of their dependencies like this:

import time
from rq import Queue, get_current_job
from redis import StrictRedis

conn = StrictRedis()
q = Queue('high', connection=conn)

def A():
    time.sleep(100)
    return 'result A'

def B():
    time.sleep(100)
    current_job = get_current_job(conn)
    a_job_id = current_job.dependencies[0].id
    a_job_result = q.fetch_job(a_job_id).result
    assert a_job_result == 'result A'
    return a_job_result + ' result B'


def C():
    time.sleep(100)
    current_job = get_current_job(conn)
    b_job_id = current_job.dependencies[0].id
    b_job_result = q.fetch_job(b_job_id).result
    assert b_job_result == 'result A result B'
    return b_job_result + ' result C'

The worker will eventually print 'result A result B result C'.

Also, if you have many jobs in the queue and B might be waiting a while before being executed, you might want to significantly increase result_ttl or make it indefinite with result_ttl=-1. Otherwise, the result of A will be purged after however many seconds are set for result_ttl in which case B will no longer be able to access it and return the desired result.

Setting result_ttl=-1 has important memory implications, however. This means your the result of your jobs will never be automatically purged and memory will grow proportionately until you manually remove those results from redis.

like image 147
chishaku Avatar answered Oct 13 '22 09:10

chishaku