Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

APScheduler how to add job outside the scheduler?

I have a simple requirement. I am running apscheduler as a separate process. I have another jobproducer script from where I want to add a job to the scheduler and run it.

This is my scheduler code,

# appsched.py
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.start()

This is my job producer script,

# jobproducer.py
from appsched import scheduler

def say_hello_job():
    print "Hello"

scheduler.add_job(say_hello_job, 'interval', minutes=1)

Needless to say that this did not work. Is there a way to make this work by either using a jobstore maybe ? How to share a scheduler with multiple different job producers ?

like image 434
Mayur Rokade Avatar asked Sep 03 '15 10:09

Mayur Rokade


People also ask

What is Max_instances in APScheduler?

The max_instances only tells you how many concurrent jobs you can have. APScheduler has three types of triggers: date interval cron. interval and cron repeat forever, date is a one-shot on a given date.

How do I share a single job store among one or more worker processes?

How do I share a single job store among one or more worker processes? ¶ Short answer: You can't. Long answer: Sharing a persistent job store among two or more processes will lead to incorrect scheduler behavior like duplicate execution or the scheduler missing jobs, etc.

How does APScheduler work?

APScheduler provides many different ways to configure the scheduler. You can use a configuration dictionary or you can pass in the options as keyword arguments. You can also instantiate the scheduler first, add jobs and configure the scheduler afterwards. This way you get maximum flexibility for any environment.

How do I stop APScheduler?

It only stops when you type Ctrl-C from your keyboard or send SIGINT to the process. This scheduler is intended to be used when APScheduler is the only task running in the process. It blocks all other code from running unless the others are running in separated threads.


1 Answers

I had a similar problem, where my scheduler process was a uWSGI MULE process and there was a separate app where I wanted to add new jobs.

Looking at the BaseScheduler's add_job() function:

with self._jobstores_lock:
if not self.running:
    self._pending_jobs.append((job, jobstore, replace_existing))
    self._logger.info('Adding job tentatively -- it will be properly scheduled when the scheduler starts')
else:
    self._real_add_job(job, jobstore, replace_existing, True)

you can see the problem: the scheduler adds jobs only when it is already started.

The solution is fortunately quite simple, we should define our own "add-job-only" Scheduler:

class JobAddScheduler(BlockingScheduler):
  def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None, misfire_grace_time=undefined,
              coalesce=undefined, max_instances=undefined, next_run_time=undefined, jobstore='default',
              executor='default', replace_existing=False, **trigger_args):

    job_kwargs = {
      'trigger': self._create_trigger(trigger, trigger_args),
      'executor': executor,
      'func': func,
      'args': tuple(args) if args is not None else (),
      'kwargs': dict(kwargs) if kwargs is not None else {},
      'id': id,
      'name': name,
      'misfire_grace_time': misfire_grace_time,
      'coalesce': coalesce,
      'max_instances': max_instances,
      'next_run_time': next_run_time
    }
    job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if value is not undefined)
    job = Job(self, **job_kwargs)

    # Add jobs to job store
    with self._jobstores_lock:
      self._real_add_job(job, jobstore, replace_existing, True)

    return job

  def start(self):
    pass

  def shutdown(self, wait=True):
    pass

  def _main_loop(self):
    pass

  def wakeup(self):
    pass

Then we can add cron jobs instantaneously:

jobscheduler = JobAddScheduler()
jobscheduler.add_job(...)

Don't forget the configure the scheduler! In my case I used SQLAlchemy-MySQL backend for storing jobs:

jobstores=dict(default=SQLAlchemyJobStore(url='mysql+pymsql://USER:PASSWORD@SERVER/DATABASE'))
jobscheduler.configure(jobstores=jobstores)

I'm not sure about the other jobstores, but after I added a new job, I had to call the wakeup() function of the separate scheduler process to "active" the job. I achieved this using uWSGI's signal system.

like image 86
Dauros Avatar answered Sep 28 '22 00:09

Dauros