I'm using Resque workers to process job in a queue, I have a large number of jobs > 1M in a queue and have some of the jobs that I need to remove ( added by error). Crating the queue with the jobs was not an easy tasks, so clearing the queue using resque-web and adding the correct jobs again is not an option for me.
Appreciate any advice. Thanks!
To remove a specific job from queue you can use the destroy method. It's very easy to use, For example if you want to remove a job with class Post and id x, which is in queue named queue1 You can do like this..
Resque::Job.destroy(queue1, Post, 'x')
If you want to remove all the jobs of particular type from a queue you can use
Resque::Job.destroy(QueueName, ClassName)
You can find it's documentation at
http://www.rubydoc.info/gems/resque/Resque%2FJob.destroy
In resque's sources (Job class) there's such method, guess it's what you need :)
# Removes a job from a queue. Expects a string queue name, a
# string class name, and, optionally, args.
#
# Returns the number of jobs destroyed.
#
# If no args are provided, it will remove all jobs of the class
# provided.
#
# That is, for these two jobs:
#
# { 'class' => 'UpdateGraph', 'args' => ['defunkt'] }
# { 'class' => 'UpdateGraph', 'args' => ['mojombo'] }
#
# The following call will remove both:
#
# Resque::Job.destroy(queue, 'UpdateGraph')
#
# Whereas specifying args will only remove the 2nd job:
#
# Resque::Job.destroy(queue, 'UpdateGraph', 'mojombo')
#
# This method can be potentially very slow and memory intensive,
# depending on the size of your queue, as it loads all jobs into
# a Ruby array before processing.
def self.destroy(queue, klass, *args)
The above solutions work great if you know all of the arguments passed to the job. If you have a situation where you know some of the arguments passed to the job the following script will work:
queue_name = 'a_queue'
jobs = Resque.data_store.peek_in_queue(queue_name, 0, 500_000);
deleted_count = 0
jobs.each do |job|
decoded_job = Resque.decode(job)
if decoded_job['class'] == 'CoolJob' && decoded_job['args'].include?('a_job_argument')
Resque.data_store.remove_from_queue(queue_name, job)
deleted_count += 1
puts "Deleted!"
end
end
puts deleted_count
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With