I have a background job that does a map/reduce job on MongoDB. When the user sends in more data to the document, it kicks of the background job that runs on the document. If the user sends in multiple requests, it will kick off multiple background jobs for the same document, but only one really needs to run. Is there a way I can prevent multiple duplicate instances? I was thinking of creating a queue for each document and making sure it is empty before I submit a new job. Or perhaps I can set a job id somehow that is the same as my document id, and check that none exists before submitting it?
Also, I just found a sidekiq-unique-jobs gem. But the documentation is non-existent. Does this do what I want?
Today Sidekiq uses a default concurrency of 25. These means Sidekiq will spawn 25 worker threads and execute up to 25 jobs concurrently in a process.
Sidekiq is multithreaded so your Workers must be thread-safe.
If you build a web application you should minimize the amount of time spent responding to every user; a fast website means a happy user.
My initial suggestion would be a mutex for this specific job. But as there's a chance that you may have multiple application servers working the sidekiq jobs, I would suggest something at the redis level.
For instance, use redis-semaphore within your sidekiq worker definition. An untested example:
def perform
s = Redis::Semaphore.new(:map_reduce_semaphore, connection: "localhost")
# verify that this sidekiq worker is the first to reach this semaphore.
unless s.locked?
# auto-unlocks in 90 seconds. set to what is reasonable for your worker.
s.lock(90)
your_map_reduce()
s.unlock
end
end
def your_map_reduce
# ...
end
https://github.com/krasnoukhov/sidekiq-middleware
UniqueJobs Provides uniqueness for jobs.
Usage
Example worker:
class UniqueWorker
include Sidekiq::Worker
sidekiq_options({
# Should be set to true (enables uniqueness for async jobs)
# or :all (enables uniqueness for both async and scheduled jobs)
unique: :all,
# Unique expiration (optional, default is 30 minutes)
# For scheduled jobs calculates automatically based on schedule time and expiration period
expiration: 24 * 60 * 60
})
def perform
# Your code goes here
end
end
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With