Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python rq - how to trigger a job when multiple other jobs are finished? Multi job dependency work arround?

I have a nested job structure in my python redis queue. First the rncopy job is executed. Once this is finished the 3 dependant registration jobs follow. When the computation of all these 3 jobs is finished I want to trigger a job to send a websocket notification to my frontend.

My current try:

    rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
    t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
    t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
    fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)
    notify = redisqueue.enqueue(print, patient_finished, patientid, timeout=6000, depends_on=(t1c_reg, t2_reg, fla_reg))

Unfortunately it seems that multi job dependency feature was never merged into the master. I saw that there currently two pull requests on git. Is there a workaround which I can use?

Sorry for failing to provide a reproducible example.

like image 785
florian Avatar asked Mar 24 '18 20:03

florian


People also ask

How do I run multiple RQ workers?

Each worker will process a single job at a time. Within a worker, there is no concurrent processing going on. If you want to perform jobs concurrently, simply start more workers. You should use process managers like Supervisor or systemd to run RQ workers in production.

What is Rqworker?

RQWORKER is a command-line monitor to start an RQ worker. It is part of python-rq module package. A worker is a Python process that typically runs in the background and exists solely as a work horse to perform lengthy or blocking tasks that you don't want to perform inside web processes.


1 Answers

New versions (RQ >= 1.8)

You can simply use depends_on parameters, passing a list or a tuple.

rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)

notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=(t1c_reg, t2_reg, fla_reg))

# you can also use a list instead of a tuple:
# notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=[t1c_reg, t2_reg, fla_reg])

Old versions (RQ < 1.8)

I use this workaround: if the dependencies are n, I create n-1 wrappers of the real function: each wrapper depends on a different job.

This solution is a bit involute , but it works.

rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)

notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=t1c_reg)

def first_wrapper(patient_finished, patientid,t2_reg_id,fla_reg_id):
    queue = Queue('YOUR-QUEUE-NAME'))
    queue.enqueue(second_wrapper, patient_finished, patientid, fla_reg_id, timeout=6000, depends_on=t2_reg_id)

def second_wrapper(patient_finished, patientid,fla_reg_id):
    queue = Queue('YOUR-QUEUE-NAME'))
    queue.enqueue(print, patient_finished, patientid, timeout=6000, depends_on=fla_reg_id)

Some caveats:

  • I don't pass the queue object to the wrappers, because some serialization problems occur; so, the queue must be recovered by name...

  • For the same reason, I pass the job.id (instead of job object) to the wrappers.

like image 78
Stefano Fiorucci - anakin87 Avatar answered Oct 28 '22 09:10

Stefano Fiorucci - anakin87