Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it possible to terminate an already running delayed job using Ruby Threading?

Let's say I have delayed_job running in the background. Tasks can be scheduled or run immediately(some are long tasks some are not)

If a task is too long, a user should be able to cancel it. Is it possible in delayed job? I checked the docs and can't seem to find a terminate method or something. They only provide a catch to cancel delayed job itself(thus cancelling all tasks...I need to just cancel a certain running task)

UPDATE My boss(who's a great programmer btw) suggested to use Ruby Threading for this feature of ours. Is this possible? Like creating new threads per task and killing that thread while it's running?

something like:

t1 = Thread.new(task.run)
self.delay.t1.join (?) -- still reading on threads so correct me if im wrong

then to stop it i'll just use t1.stop (?) again don't know yet

Is this possible? Thanks!

like image 233
corroded Avatar asked Apr 06 '11 11:04

corroded


3 Answers

It seems that my boss hit the spot so here's what we did(please tell us if there's some possibility this is bad practice so I can bring it up):

  1. First, we have a Job model that has def execute! (which runs what it's supposed to do).
  2. Next, we have delayed_job worker in the background, listening for new jobs. Now when you create a job, you can schedule it to run immediately or run every certain day (we use rufus for this one)
  3. When a job is created, it checks if its supposed to run immediately. If it is, it adds itself to the delayed job queue. The execute function creates a Thread, so each job has its own thread.
  4. User in the ui can see if a job is running(if there's a started_at and no finished_at). If it IS running, there's a button to cancel it. Canceling it just sets the job's canceled_at to Time.now.
  5. While the job is running it also checks itself if it has a canceled_at or if Time.now is > finished_at. If so, kill the thread.

Voila! We've tested it for one job and it seems to work. Now the only problem is scaling...

If you see any problems with this please do so in the comments or give more suggestions if ever :) I hope this helps some one too!

like image 90
corroded Avatar answered Oct 21 '22 17:10

corroded


Delayed::Job is an < ActiveRecord::Base model, so you can query it just like you normally would like Delayed::Job.all(:conditions => {:last_error => nil}).

Delayed::Job objects have a payload field which contain a serialized version of the method or job that you're attempting to run. This object is accessed by their '#payload_object' method, which loads the object in question.

You can combine these two capabilities to make queriable job workers, for instance, if you have a User model, and the user has a paperclip'ed :avatar, then you can make a method to delete unprocessed jobs like so:

class User < ActiveRecord::Base
   has_attached_file :avatar, PaperclipOptions.new(:avatar)
   before_create :'process_avatar_later'

   def process_avatar_later
      filename = Rails.root.join('tmp/avatars_for_processing/',self.id)
      open(filename, 'w') do |file| file <<self.avatar.to_file end
      Delayed::Job.enqueue(WorkAvatar.new(self.id, filename))
      self.avatar = nil
   end

   def cancel_future_avatar_processing
      WorkAvatar.future_jobs_for_user(self.id).each(&:destroy)
      #ummm... tell them to reupload their avatar, I guess?
   end

   class WorkAvatar < Struct.new(:user_id, :path)
     def user
        @user ||= User.find(self.user_id)
     end
     def self.all_jobs
       Delayed::Job.scoped(:conditions => 'payload like "%WorkAvatar%"')
     end
     def self.future_jobs_for_user(user_id)
       all_jobs.scoped(:conditions => {:locked_at => nil}).select do |job|
          job.payload_object.user_id == user_id
       end
     end  
     def perform
        @user.avatar = File.open(path, 'rb')
        @user.save()
     end
   end          
end

It's possible someone has made a plugin make queryable objects like this. Perhaps searching on GitHub would be fruitful.

Note also that you'd have to work with any process monitoring tools you might have to cancel any running job worker processes that are being executed if you want to cancel a job that has locked_at and locked_by set.

like image 21
Tim Snowhite Avatar answered Oct 21 '22 18:10

Tim Snowhite


You can wrap the task into a Timeout statement.

require 'timeout'

class TaskWithTimeout < Struct.new(:parameter)
  def perform
    Timeout.timeout(10) do
      # ...
    end
  rescue Timeout::Error => e
    # the task took longer than 10 seconds
  end
end
like image 34
Simone Carletti Avatar answered Oct 21 '22 17:10

Simone Carletti