I'm new to APScheduler and testing it before implementing it in a larger scope.
I have created the code below, but somehow my functions are never called when I use add_job with trigger='date'
. If I use trigger='interval'
, then everything works fine.
I also tried to play with run_date
param, with no luck.
Any idea about what could be wrong ?
Apscheduler is version 3.0.3
Many thanks in advance :)
def my_listener(event):
if event.exception:
print('The job crashed :(')
else:
print('The job worked :)')
def test():
print("{} ok ".format(datetime.now()))
def myFunc(content, img):
print("{} - content={}|image{}".format(datetime.now(), content, img))
myfile = open("scheduler.log", "a")
myfile.write("{} - content={}|image{}".format(datetime.now(), content, img))
myfile.close()
def main():
jobstores = \
{
'default': SQLAlchemyJobStore(url="postgresql+psycopg2://{}:{}@{}:{}/{}".format(db_user, db_password, db_host, db_port, db_database))
}
executors = \
{
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = \
{
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
scheduler.start()
CURR_DATE = datetime.strptime(datetime.strftime(datetime.now(), '%Y%m%d%H%M'), '%Y%m%d%H%M')
JOB_DATE = CURR_DATE + timedelta(minutes=1)
uid=uuid.uuid4()
newjob = scheduler.add_job(myFunc,
trigger='date',
args=['content data', 'image data'],
kwargs=None,
id=str(uid),
name='test' + str(uid),
misfire_grace_time=5,
coalesce=False,
max_instances=1,
next_run_time= JOB_DATE,
jobstore='default',
executor='default',
replace_existing=True)
print("Added - {}".format(newjob))
scheduler.add_listener(my_listener, events.EVENT_JOB_EXECUTED | events.EVENT_JOB_ERROR)
scheduler.print_jobs()
while True:
sys.stdout.write('{}\n'.format(datetime.now())); sys.stdout.flush()
sleep(1)
if __name__ == "__main__":
main()
Problem is that you are using next_run_time= JOB_DATE, instead of run_date=JOB_DATE,
another option is declaring your trigger and passing it as a parameter to add job like this:
trigger = DateTrigger(run_date=start_date)
newjob = scheduler.add_job(myFunc,
trigger=trigger,
args=['content data', 'image data'],
kwargs=None,
id=str(uid),
name='test' + str(uid),
misfire_grace_time=5,
coalesce=False,
max_instances=1,
jobstore='default',
executor='default',
replace_existing=True)
another problem with your code is that while you made your scheduler timeaware you are using an unaware datetime
CURR_DATE = datetime.strptime(datetime.strftime(datetime.now(), '%Y%m%d%H%M'), '%Y%m%d%H%M')
JOB_DATE = CURR_DATE + timedelta(minutes=1) #this is unaware
try declaring your date this way:
import pytz
import datetime
job_date = datetime.datetime.now(pytz.UTC) + datetime.timedelta(minutes=1)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With