I started working with Sidekiq recently and noticed that it has an awesome feature that I've been searching for a long time:
UserMailer.delay_until(5.days.from_now).find_more_friends_email
Basically I can schedule a job in the future so I don't need my app to poll continuously for new events with a start time.
Now this works like a charm, but how do I change the start time of a job? Often some scheduled events will have their start time changed. How do I replicate this in sidekiq?
I know I can delete the job and create a new one, but is it possible to just modify the start time?
EDIT:
I built upon Oto Brglez's idea and here is the documented code:
module TaskStuff
class TaskSetter
include Sidekiq::Worker
sidekiq_options retry: false
def perform(task_id)
task = Task.find(task_id)
# Get the worker that's performing this job
if worker = AppHelpers.find_worker(jid)
# If the worker matches the "at" timestamp then this is the right one and we should execute it
if worker.last["payload"]["at"].to_s.match(/(\d*\.\d{0,3})\d*/)[1] == task.start.to_f.to_s.match(/(\d*\.\d{0,3})\d*/)[1]
task.execute
else
custom_logger.debug("This worker is the wrong one. Skipping...")
end
else
custom_logger.error("We couldn't find the worker")
end
end
end
end
module AppHelpers
[...]
def self.find_worker(jid)
Sidekiq::Workers.new.select {|e| e.last["payload"]["jid"] == jid}.first
end
[...]
end
> task = Task.create(start: 5.hours.from_now)
> TaskStuff::TastSetter.perform_at(task.start, task.id)
Now if I do this
> task.update_attributes(start: 4.hours.from_now)
> TaskStuff::TastSetter.perform_at(task.start, task.id)
the task will get executed in 4 hours and the other job (that will get executed in 5 hours) will get ignored and removed when it reaches it's time.
Initially I tried using Time.now
instead of worker.last["payload"]["at"]
but that could have been pretty inaccurate because a scheduled job will not always get executed on time. There is a check interval of 15 seconds, and if all workers are busy elsewhere, the job could be delayed further.
I had to use Regexp
for matching the start time because when reading the task.start
I might have gotten a float with a different number of decimals and the if condition would not pass. This way I'm getting both values to 3 decimal.
I found that the only way of getting the "at" attribute of the job is by getting it through the worker. If I were to ask Redis or use Mike's Sidekiq::ScheduledSet.new
I would not get the current job because it would have already been pulled out of Redis.
EDIT 2:
For anyone interested, I went with a similar but different approach. Basically instead of comparing the start time of the Task
, I added an extra field to the model and Sidekiq call, called start_token
. If the sidekiq job has been called with the same token that the object has then it's valid, otherwise discard and skip the job. The token gets updated every time the model changes start_time.
Sidekiq Scheduler is a lightweight job scheduling extension for Sidekiq. It uses Rufus Scheduler under the hood, that is itself an in-memory scheduler. Sidekiq Scheduler extends Sidekiq by starting a Rufus Scheduler thread in the same process, loading and maintaining the schedules for it.
To run sidekiq, you will need to open a terminal, navigate to your application's directory, and start the sidekiq process, exactly as you would start a web server for the application itself. When the command executes you will see a message that sidekiq has started.
The author of Sidekiq recommends a limit of 1,000 jobs per bulk enqueue and that's the default in the perform_bulk method. Even then you are saving the time of 999 round trips to Redis.
On the browser, goto ==> http://localhost:{port}/sidekiq/recurring-jobs . where {port} is the port your application is running in. You will see the list of scheduled jobs for your application and some other details about it. Save this answer.
This is probably not answering the question, but this question is first on google for "sidekiq increase the time of an already scheduled job".
Sidekiq has this method(reschedule
) on a SortedEntry job.
Find the job:
job = Sidekiq::ScheduledSet.new.find_job(job_id)
reschedule the job:
job.reschedule(Time.now + 2.hours)
I dont think editing/updating jobs is the way. I usually create new job each time something changes. And then in the job itself check if the time to execute certain task is right... If the time is right, continue with the task otherwise just skip...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With