I have a simple requirement. I am running apscheduler as a separate process. I have another jobproducer script from where I want to add a job to the scheduler and run it.
This is my scheduler code,
# appsched.py
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.start()
This is my job producer script,
# jobproducer.py
from appsched import scheduler
def say_hello_job():
print "Hello"
scheduler.add_job(say_hello_job, 'interval', minutes=1)
Needless to say that this did not work. Is there a way to make this work by either using a jobstore maybe ? How to share a scheduler with multiple different job producers ?
The max_instances only tells you how many concurrent jobs you can have. APScheduler has three types of triggers: date interval cron. interval and cron repeat forever, date is a one-shot on a given date.
How do I share a single job store among one or more worker processes? ¶ Short answer: You can't. Long answer: Sharing a persistent job store among two or more processes will lead to incorrect scheduler behavior like duplicate execution or the scheduler missing jobs, etc.
APScheduler provides many different ways to configure the scheduler. You can use a configuration dictionary or you can pass in the options as keyword arguments. You can also instantiate the scheduler first, add jobs and configure the scheduler afterwards. This way you get maximum flexibility for any environment.
It only stops when you type Ctrl-C from your keyboard or send SIGINT to the process. This scheduler is intended to be used when APScheduler is the only task running in the process. It blocks all other code from running unless the others are running in separated threads.
I had a similar problem, where my scheduler process was a uWSGI MULE process and there was a separate app where I wanted to add new jobs.
Looking at the BaseScheduler's add_job()
function:
with self._jobstores_lock:
if not self.running:
self._pending_jobs.append((job, jobstore, replace_existing))
self._logger.info('Adding job tentatively -- it will be properly scheduled when the scheduler starts')
else:
self._real_add_job(job, jobstore, replace_existing, True)
you can see the problem: the scheduler adds jobs only when it is already started.
The solution is fortunately quite simple, we should define our own "add-job-only" Scheduler:
class JobAddScheduler(BlockingScheduler):
def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None, misfire_grace_time=undefined,
coalesce=undefined, max_instances=undefined, next_run_time=undefined, jobstore='default',
executor='default', replace_existing=False, **trigger_args):
job_kwargs = {
'trigger': self._create_trigger(trigger, trigger_args),
'executor': executor,
'func': func,
'args': tuple(args) if args is not None else (),
'kwargs': dict(kwargs) if kwargs is not None else {},
'id': id,
'name': name,
'misfire_grace_time': misfire_grace_time,
'coalesce': coalesce,
'max_instances': max_instances,
'next_run_time': next_run_time
}
job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if value is not undefined)
job = Job(self, **job_kwargs)
# Add jobs to job store
with self._jobstores_lock:
self._real_add_job(job, jobstore, replace_existing, True)
return job
def start(self):
pass
def shutdown(self, wait=True):
pass
def _main_loop(self):
pass
def wakeup(self):
pass
Then we can add cron jobs instantaneously:
jobscheduler = JobAddScheduler()
jobscheduler.add_job(...)
Don't forget the configure the scheduler! In my case I used SQLAlchemy-MySQL backend for storing jobs:
jobstores=dict(default=SQLAlchemyJobStore(url='mysql+pymsql://USER:PASSWORD@SERVER/DATABASE'))
jobscheduler.configure(jobstores=jobstores)
I'm not sure about the other jobstores, but after I added a new job, I had to call the wakeup()
function of the separate scheduler process to "active" the job. I achieved this using uWSGI's signal system.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With