I'm wondering if there's a way to set up RabbitMQ or Redis to work with Celery so that when I send a task to the queue, it doesn't go into a list of tasks, but rather into a Set of tasks keyed based on the payload of my task, in order to avoid duplicates.
Here's my setup for more context: Python + Celery. I've tried RabbitMQ as a backend, now I'm using Redis as a backend because I don't need the 100% reliability, easier to use, small memory footprint, etc.
I have roughly 1000 ids that need work done repeatedly. Stage 1 of my data pipeline is triggered by a scheduler and it outputs tasks for stage 2. The tasks contain just the id for which work needs to be done and the actual data is stored in the database. I can run any combination or sequence of stage 1 and stage 2 tasks without harm.
If stage 2 doesn't have enough processing power to deal with the volume of tasks output by stage 1, my task queue grows and grows. This wouldn't have to be the case if the task queue used sets as the underlying data structure instead of lists.
Is there an off-the-shelf solution for switching from lists to sets as distributed task queues? Is Celery capable of this? I recently saw that Redis has just released an alpha version of a queue system, so that's not ready for production use just yet.
Should I architect my pipeline differently?
You can use an external data structure to store and monitor the current state of your celery queue. 1. Lets take a redis key-value for example. Whenever you push a task into celery, you mark a key with your 'id' field as true in redis.
Before trying to push a new task with any 'id', you would check if the key with 'id' is true in redis or not, if yes, you skip pushing the task.
To clear the keys at proper time, you can use after_return handler of celery, which runs when the task has returned. This handler will unset the key 'id' in redis , hence clearing the lock for next task push .
This method ensures you only have ONE instance per id of task running in celery queue. You can also enhance it to allow only N tasks per id by using INCR and DECR commands on the redis key, when the task is pushed and after_return of the task.
Can your tasks in stage 2 check whether the work has already been done and, if it has, then not do the work again? That way, even though your task list will grow, the amount of work you need to do won't.
I haven't come across a solution re the sets / lists, and I'd think there were lots of other ways of getting around this issue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With