Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

passing current object to python apscheduler method

I'm trying to schedule a job every X hours within a class. However I'm not sure how to pass the current context to the method, since it requires "self". I know that if do it cron-style, i can use an args argument list, but that hasn't worked either. Help?

class MyClass(object):
    @settings.scheduler.interval_schedule(hours=2)
    def post(self, first_argument=None):
        # do stuff
        self.cleanup()

Results in

Traceback (most recent call last):
      File "/usr/local/lib/python2.7/dist-packages/apscheduler/scheduler.py", line 510, in _run_job
        retval = job.func(*job.args, **job.kwargs)
    TypeError: post() takes at least 1 argument (0 given)

Thanks.

like image 714
L-R Avatar asked Aug 24 '13 17:08

L-R


People also ask

How does APScheduler work?

In-process task scheduler with Cron-like capabilities Advanced Python Scheduler (APScheduler) is a Python library that lets you schedule your Python code to be executed later, either just once or periodically. You can add new jobs or remove old ones on the fly as you please.

What is Max_instances in APScheduler?

The max_instances only tells you how many concurrent jobs you can have. APScheduler has three types of triggers: date interval cron. interval and cron repeat forever, date is a one-shot on a given date.

How do I stop background scheduler?

Below is an example of a background scheduler. In this example, sched is a BackgroundScheduler instance. The main thread runs a while-loop sleeping forever; the scheduler thread triggers the job every 3 seconds. It only stops when you type Ctrl-C from your keyboard or send SIGINT to the process.


1 Answers

You can go this way:

class MyClass(object):
    def post(self, first_argument=None):
        # do stuff
        self.cleanup()


@settings.scheduler.interval_schedule(hours=2)
def my_job(first_argument=None):
    my_class = MyClass()
    my_class.post(first_argument)

Or, this way:

my_class = MyClass()
scheduler.add_job(my_class.post, 'interval', {'seconds': 3}, kwargs={'first_argument': first_argument})
like image 89
alecxe Avatar answered Sep 21 '22 13:09

alecxe