I have a rails app that fetches a lot of emails from multiple IMAP accounts.
2 issues though:
My code:
class FetchMailsJobs
include Sidekiq::Worker
include Sidetiq::Schedulable
tiq { hourly.minute_of_hour(0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55) }
def perform(last_occurrence, current_occurrence)
users = User.all
users.each do |user|
if user.imap_accounts.exists?
ImapJob.perform_async(user._id.to_s)
end
end
end
end
class ImapJob
include Sidekiq::Worker
def perform(user_id)
s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, connection: "localhost")
if s.lock
user = User.where(_id: user_id).first
emails = ImapMails.receive_mails(user)
s.unlock
end
end
end
Sidekiq process pops jobs from Redis and executes them. In your MONITOR output, you should see LPUSH commands when Rails sends mail. You should also see BRPOP commands from Sidekiq. You need to make sure that both Rails and Sidekiq processes use the same Redis server, database number, and namespace (if any).
Sidekiq is supported by Redis as a job management tool to process thousands of jobs in a second. Follow the steps to add Sidekiq and Redis to your existing application. Note: Redis gem is also installed in sidekiq, you don't have to install it independently.
If you build a web application you should minimize the amount of time spent responding to every user; a fast website means a happy user.
The author of Sidekiq recommends a limit of 1,000 jobs per bulk enqueue and that's the default in the perform_bulk method. Even then you are saving the time of 999 round trips to Redis.
Redis
subclass and overload blpop
to accept -1
for non-blocking use of lpop
.redis-semaphore calls @redis.blpop
in Redis::Semaphore#lock
. While you could overload the lock
method to use @redis.lpop
instead, a much simpler approach would be to pass a custom instance of Redis
to the semaphore.
Place the following in the lib
of your rails app and require it in your config/initializers/sidekiq.rb
(or do whatever your preference might be for loading the following class).
class NonBlockingRedis < Redis
def blpop(key, timeout)
if timeout == -1
result = lpop(key)
return result if result.nil?
return [key, result]
else
super(key, timeout)
end
end
end
Whenever you call Redis::Semaphore.new
, pass a :redis
key with a new instance of the NonBlockingRedis
class.
Call s.lock
with -1
as an argument to use lpop
instead of blpop
.
s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, redis: NonBlockingRedis.new(connection: "localhost"))
if s.lock -1
user = User.where(_id: user_id).first
emails = ImapMails.receive_mails(user)
s.unlock
end
sidekiq_options retry: false
in your worker class should work, see below for an example.In your question, you didn't specify which worker you were having problems with jobs ending up in the retry queue. Since FetchMailsJobs
ends up enqueing ImapJob
jobs, an exception in the former may cause it to appear that the ImapJob
is being re-queued.
With your semaphore lock, it would also be a good idea to wrap your work in a begin rescue ensure
block.
class FetchMailsJobs
include Sidekiq::Worker
include Sidetiq::Schedulable
sidekiq_options retry: false
tiq { hourly.minute_of_hour(0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55) }
def perform(last_occurrence, current_occurrence)
users = User.all
users.each do |user|
if user.imap_accounts.exists?
ImapJob.perform_async(user._id.to_s)
end
end
end
end
class ImapJob
include Sidekiq::Worker
sidekiq_options retry: false
def perform(user_id)
s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, redis: NonBlockingRedis.new(connection: "localhost"))
if s.lock - 1
begin
user = User.where(_id: user_id).first
emails = ImapMails.receive_mails(user)
rescue => e
# ignore; do nothing
ensure
s.unlock
end
end
end
end
See sidekiq Advanced Options: workers for more information.
Would it not be possible to ditch using redis-semaphore and use the sidekiq-unique-jobs gem ? It seems like a nicely contained tool that does exactly what you're looking for.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With