Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How best to keep a job queue clean of retry/duplicate jobs (using sidekiq and redis-semaphore)

I have a rails app that fetches a lot of emails from multiple IMAP accounts.

  • I use sidekiq to handle the jobs.
  • I use sidetiq to schedule the jobs.
  • I use redis-semaphore to ensure that recurring jobs for the same user don't stumble upon each other.

2 issues though:

  • 1: When a job hits "if s.lock" redis-semaphore puts it on hold until all previous jobs have finished. I need the job to be cancelled instead of being queued.
  • 2: If during a job an exception is raised, resulting in a crash, sidekiq will put the job back into the queue for a retry. I need the job to be cancelled instead of being queued. Putting "sidekiq_options :retry => false" into the code does not seem to make a difference.

My code:

class FetchMailsJobs
  include Sidekiq::Worker
  include Sidetiq::Schedulable

  tiq { hourly.minute_of_hour(0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55) }

  def perform(last_occurrence, current_occurrence)
    users = User.all
    users.each do |user|

      if user.imap_accounts.exists?
        ImapJob.perform_async(user._id.to_s)
      end
    end
  end
end

class ImapJob
  include Sidekiq::Worker

  def perform(user_id)
    s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, connection: "localhost")
    if s.lock
      user = User.where(_id: user_id).first
      emails = ImapMails.receive_mails(user)
      s.unlock
    end
  end
end
like image 378
Cjoerg Avatar asked May 04 '13 23:05

Cjoerg


People also ask

How does Redis work with Sidekiq?

Sidekiq process pops jobs from Redis and executes them. In your MONITOR output, you should see LPUSH commands when Rails sends mail. You should also see BRPOP commands from Sidekiq. You need to make sure that both Rails and Sidekiq processes use the same Redis server, database number, and namespace (if any).

Does Sidekiq need Redis?

Sidekiq is supported by Redis as a job management tool to process thousands of jobs in a second. Follow the steps to add Sidekiq and Redis to your existing application. Note: Redis gem is also installed in sidekiq, you don't have to install it independently.

Should I use Activejob Sidekiq?

If you build a web application you should minimize the amount of time spent responding to every user; a fast website means a happy user.

How many jobs can Sidekiq handle?

The author of Sidekiq recommends a limit of 1,000 jobs per bulk enqueue and that's the default in the perform_bulk method. Even then you are saving the time of 999 round trips to Redis.


2 Answers

1. Create a Redis subclass and overload blpop to accept -1 for non-blocking use of lpop.

redis-semaphore calls @redis.blpop in Redis::Semaphore#lock. While you could overload the lock method to use @redis.lpop instead, a much simpler approach would be to pass a custom instance of Redis to the semaphore.

Place the following in the lib of your rails app and require it in your config/initializers/sidekiq.rb (or do whatever your preference might be for loading the following class).

class NonBlockingRedis < Redis
  def blpop(key, timeout)
    if timeout == -1
      result = lpop(key)
      return result if result.nil?
      return [key, result]
    else
      super(key, timeout)
    end
  end
end

Whenever you call Redis::Semaphore.new, pass a :redis key with a new instance of the NonBlockingRedis class.

Call s.lock with -1 as an argument to use lpop instead of blpop.

s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, redis: NonBlockingRedis.new(connection: "localhost"))
if s.lock -1
  user = User.where(_id: user_id).first
  emails = ImapMails.receive_mails(user)
  s.unlock
end

2. Using sidekiq_options retry: false in your worker class should work, see below for an example.

In your question, you didn't specify which worker you were having problems with jobs ending up in the retry queue. Since FetchMailsJobs ends up enqueing ImapJob jobs, an exception in the former may cause it to appear that the ImapJob is being re-queued.

With your semaphore lock, it would also be a good idea to wrap your work in a begin rescue ensure block.

class FetchMailsJobs
  include Sidekiq::Worker
  include Sidetiq::Schedulable

  sidekiq_options retry: false

  tiq { hourly.minute_of_hour(0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55) }

  def perform(last_occurrence, current_occurrence)
    users = User.all
    users.each do |user|

      if user.imap_accounts.exists?
        ImapJob.perform_async(user._id.to_s)
      end
    end
  end
end

class ImapJob
  include Sidekiq::Worker

  sidekiq_options retry: false

  def perform(user_id)
    s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, redis: NonBlockingRedis.new(connection: "localhost"))
    if s.lock - 1
      begin
        user = User.where(_id: user_id).first
        emails = ImapMails.receive_mails(user)
      rescue => e
        # ignore; do nothing
      ensure
        s.unlock
      end
    end
  end
end

See sidekiq Advanced Options: workers for more information.

like image 97
potatosalad Avatar answered Nov 15 '22 03:11

potatosalad


Would it not be possible to ditch using redis-semaphore and use the sidekiq-unique-jobs gem ? It seems like a nicely contained tool that does exactly what you're looking for.

like image 21
Ivar Avatar answered Nov 15 '22 03:11

Ivar