I have a nested job structure in my python redis queue. First the rncopy job is executed. Once this is finished the 3 dependant registration jobs follow. When the computation of all these 3 jobs is finished I want to trigger a job to send a websocket notification to my frontend.
My current try:
rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)
notify = redisqueue.enqueue(print, patient_finished, patientid, timeout=6000, depends_on=(t1c_reg, t2_reg, fla_reg))
Unfortunately it seems that multi job dependency feature was never merged into the master. I saw that there currently two pull requests on git. Is there a workaround which I can use?
Sorry for failing to provide a reproducible example.
Each worker will process a single job at a time. Within a worker, there is no concurrent processing going on. If you want to perform jobs concurrently, simply start more workers. You should use process managers like Supervisor or systemd to run RQ workers in production.
RQWORKER is a command-line monitor to start an RQ worker. It is part of python-rq module package. A worker is a Python process that typically runs in the background and exists solely as a work horse to perform lengthy or blocking tasks that you don't want to perform inside web processes.
New versions (RQ >= 1.8)
You can simply use depends_on
parameters, passing a list or a tuple.
rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)
notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=(t1c_reg, t2_reg, fla_reg))
# you can also use a list instead of a tuple:
# notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=[t1c_reg, t2_reg, fla_reg])
Old versions (RQ < 1.8)
I use this workaround: if the dependencies are n, I create n-1 wrappers of the real function: each wrapper depends on a different job.
This solution is a bit involute , but it works.
rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)
notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=t1c_reg)
def first_wrapper(patient_finished, patientid,t2_reg_id,fla_reg_id):
queue = Queue('YOUR-QUEUE-NAME'))
queue.enqueue(second_wrapper, patient_finished, patientid, fla_reg_id, timeout=6000, depends_on=t2_reg_id)
def second_wrapper(patient_finished, patientid,fla_reg_id):
queue = Queue('YOUR-QUEUE-NAME'))
queue.enqueue(print, patient_finished, patientid, timeout=6000, depends_on=fla_reg_id)
Some caveats:
I don't pass the queue object to the wrappers, because some serialization problems occur; so, the queue must be recovered by name...
For the same reason, I pass the job.id (instead of job object) to the wrappers.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With