Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rails 4.2 get delayed job id from active job

Any idea how to get the Delayed::Job id from the ActiveJob enqueuing? When I enqueue a job I get back an instance of ActiveJob::Base with a @job_id, but that job id seems to be internal to ActiveJob. My best guess so far is just to walk down the most recently created jobs:

active_job_id = GenerateReportJob.perform_later(self.id).job_id
delayed_job = Delayed::Job.order(id: :desc).limit(5).detect do |job|
  YAML.load(job.handler).job_data['job_id'] == active_job_id
end

but that seems all kinds of hacky. Kind of surprised ActiveJob isn't returning the ID from Delayed::Job, especially since that is what is explicitly returned when the job gets enqueued.

== EDIT

Looks like I'm not the only one (https://github.com/rails/rails/issues/18821)

like image 847
kddeisz Avatar asked Apr 24 '15 19:04

kddeisz


3 Answers

In case anyone finds this in the future: Rails just accepted a patch to allow you to get this id from provider_job_id in Rails 5. You can get it to work with a patch like

ActiveJob::QueueAdapters::DelayedJobAdapter.singleton_class.prepend(Module.new do
  def enqueue(job)
    provider_job = super
    job.provider_job_id = provider_job.id
    provider_job
  end

  def enqueue_at(job, timestamp)
    provider_job = super
    job.provider_job_id = provider_job.id
    provider_job
  end
end)
like image 172
kddeisz Avatar answered Oct 20 '22 22:10

kddeisz


Inspired by the answer of Beguene and some reverse engineering of the Rails 5 ActiveJob code, I have made it work with Rails 4.2 by

1) adding following code in lib/active_job/queue_adapters/delayed_job_adapter.rb or config/initializers/delayed_job.rb (both locations have worked):

# file: lib/active_job/queue_adapters/delayed_job_adapter.rb
module ActiveJob
  module Core
    # ID optionally provided by adapter
    attr_accessor :provider_job_id
  end

  module QueueAdapters
    class DelayedJobAdapter
      class << self
        def enqueue(job) #:nodoc:
          delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name)
          job.provider_job_id = delayed_job.id
          delayed_job
        end

        def enqueue_at(job, timestamp) #:nodoc:
          delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, run_at: Time.at(timestamp))
          job.provider_job_id = delayed_job.id
          delayed_job
        end
      end
      class JobWrapper #:nodoc:
        attr_accessor :job_data

        def initialize(job_data)
          @job_data = job_data
        end

        def perform
          Base.execute(job_data)
        end
      end
    end
  end
end

The attr_accessor :provider_job_id statement is needed in Rails 4.2, since it is used in the enqueue method and is not yet defined in 4.2.

Then we can make use of it like follows:

2) define our own ActiveJob class:

# file: app/jobs/my_job.rb
class MyJob < ActiveJob::Base
  queue_as :default

  def perform(object, performmethod = method(:method))
    # Do something later
      returnvalue = object.send(performmethod)
      returnvalue
    end

  end
end

3) Now we can create a new job anywhere in the code:

job = MyJob.perform_later(Myobject, "mymethod")

This will put the method Myobject.mymethod into the queue.

4) The code in 1) helps us to find the Delayed Job that is associated with our job:

delayed_job = Delayed::Job.find(job.provider_job_id)

5) finally, we can do, whatever we need to do with the delayed_job, e.g. delete it:

delayed_job.delete

Note: in Rails 5, step 1) will not be needed anymore, since the exact same code is integral part of Rails 5.

like image 27
Olli Avatar answered Oct 20 '22 21:10

Olli


I made it work in Rails 4.2 using the new patch from Rails 5 like this:

create the file lib/active_job/queue_adapters/delayed_job_adapter.rb

module ActiveJob
  module QueueAdapters
    class DelayedJobAdapter
      class << self
        def enqueue(job) #:nodoc:
          delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name)
          job.provider_job_id = delayed_job.id
          delayed_job
        end

        def enqueue_at(job, timestamp) #:nodoc:
          delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, run_at: Time.at(timestamp))
          job.provider_job_id = delayed_job.id
          delayed_job
        end
      end

      class JobWrapper #:nodoc:
        attr_accessor :job_data

        def initialize(job_data)
          @job_data = job_data
        end

        def perform
          Base.execute(job_data)
        end
      end
    end
  end
end
like image 45
Beguene Avatar answered Oct 20 '22 22:10

Beguene