I have now a big number of documents to process and am using Python RQ to parallelize the task.
I would like a pipeline of work to be done as different operations is performed on each document. For example: A
-> B
-> C
means pass the document to function A
, after A
is done, proceed to B
and last C
.
However, Python RQ does not seem to support the pipeline stuff very nicely.
Here is a simple but somewhat dirty of doing this. In one word, each function along the pipeline call its next function in a nesting way.
For example, for a pipeline A
->B
->C
.
At the top level, some code is written like this:
q.enqueue(A, the_doc)
where q is the Queue
instance and in function A
there are code like:
q.enqueue(B, the_doc)
And in B
, there are something like this:
q.enqueue(C, the_doc)
Is there any other way more elegant than this? For example some code in ONE function:
q.enqueue(A, the_doc)
q.enqueue(B, the_doc, after = A)
q.enqueue(C, the_doc, after= B)
depends_on parameter is the closest one to my requirement, however, running something like:
A_job = q.enqueue(A, the_doc)
q.enqueue(B, depends_on=A_job )
won't work. As q.enqueue(B, depends_on=A_job )
is executed immediately after A_job = q.enqueue(A, the_doc)
is executed. By the time B is enqueued, the result from A might not be ready as it takes time to process.
PS:
If Python RQ is not really good at this, what else tool in Python can I use to achieve the same purpose:
By the time B is enqueued, the result from A might not be ready as it takes time to process.
I'm not sure if this was actually true when you originally posted the question but in any case, this is not true now. In fact, the depends_on
feature is made exactly for the workflow you described.
It is true that these two functions are executed immediately in succession.
A_job = q.enqueue(A, the_doc)
B_job = q.enqueue(B, depends_on=A_job )
But the worker will not execute B
until A
is finished. Until A_job
is successfully executed, B.status == 'deferred'
. Once A.status == 'finished'
, then B
will start to run.
This means that B
and C
can access and operate on the result of their dependencies like this:
import time
from rq import Queue, get_current_job
from redis import StrictRedis
conn = StrictRedis()
q = Queue('high', connection=conn)
def A():
time.sleep(100)
return 'result A'
def B():
time.sleep(100)
current_job = get_current_job(conn)
a_job_id = current_job.dependencies[0].id
a_job_result = q.fetch_job(a_job_id).result
assert a_job_result == 'result A'
return a_job_result + ' result B'
def C():
time.sleep(100)
current_job = get_current_job(conn)
b_job_id = current_job.dependencies[0].id
b_job_result = q.fetch_job(b_job_id).result
assert b_job_result == 'result A result B'
return b_job_result + ' result C'
The worker will eventually print 'result A result B result C'
.
Also, if you have many jobs in the queue and B
might be waiting a while before being executed, you might want to significantly increase result_ttl
or make it indefinite with result_ttl=-1
. Otherwise, the result of A will be purged after however many seconds are set for result_ttl
in which case B
will no longer be able to access it and return the desired result.
Setting result_ttl=-1
has important memory implications, however. This means your the result of your jobs will never be automatically purged and memory will grow proportionately until you manually remove those results from redis.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With